diff --git a/docs/source/services/replay/4_api.rst b/docs/source/services/replay/4_api.rst index 2db124b8..eaccac39 100644 --- a/docs/source/services/replay/4_api.rst +++ b/docs/source/services/replay/4_api.rst @@ -112,6 +112,117 @@ Finds keyframes in a video stream. Returns found keyframes from oldest to newest - ``{"error" => "Reason"}`` - problem description +Get Keyframe by UUID +-------------------- + +Retrieves a specific keyframe by UUID and returns a multipart response containing metadata and binary data parts. + +.. code-block:: bash + + curl "http://127.0.0.1:8080/api/v1/keyframe/{uuid}?source_id=in-video" \ + -o keyframe.multipart + +.. list-table:: + :header-rows: 1 + + * - Parameter + - Description + - Extra + - Description + * - Method + - GET + - /api/v1/keyframe/{uuid} + - returns multipart/mixed response + * - ``uuid`` (path) + - string + - required + - Keyframe UUID (obtained from ``find_keyframes``) + * - ``source_id`` (query) + - string + - required + - video source identifier + * - ``metadata_format`` (query) + - string + - optional, default: ``native`` + - Format for metadata: ``json`` (human-readable) or ``native`` (Savant Message protobuf) + * - Response OK + - 200 OK + - multipart/mixed + - metadata + data parts + * - Not Found + - 404 Not Found + - JSON + - UUID not found for this source + * - Invalid Request + - 400 Bad Request + - JSON + - invalid UUID format or empty source id + * - Internal Error + - 500 Internal Server Error + - JSON + - database or decoding issue + +**Response Format (200 OK)** + +The response uses ``multipart/mixed`` format with the following parts in order: + +1. **Metadata part**: Video frame metadata serialized in the requested format: + + - ``native`` (default): ``Content-Type: application/x-protobuf`` - Savant Message protobuf, compatible with ``load_message_from_bytes()`` + - ``json``: ``Content-Type: application/json`` - JSON-serialized VideoFrame metadata + +2. **Data parts** (``Content-Type: application/octet-stream``): Zero or more binary data blobs (encoded frame content, external references, etc.) + +Each data part includes an ``index`` attribute in the Content-Disposition header indicating its position (0, 1, 2, ...). The semantic meaning of each index is application-specific and depends on the upstream producer. + +**Usage with native format (default):** + +For clients using savant-rs, the native format eliminates the need for JSON parsing: + +.. code-block:: python + + from savant_rs.utils.serialization import load_message_from_bytes + + # Parse metadata part from multipart response + message = load_message_from_bytes(metadata_bytes) + video_frame = message.as_video_frame() + +Example response structure (with ``metadata_format=json``): + +.. code-block:: text + + --savant-frame- + Content-Type: application/json + Content-Disposition: inline; name="metadata" + Content-Length: + + {"uuid": "...", "source_id": "...", "pts": ..., ...} + --savant-frame- + Content-Type: application/octet-stream + Content-Disposition: inline; name="data"; index="0" + Content-Length: + + + --savant-frame--- + +Example response structure (with ``metadata_format=native``, default): + +.. code-block:: text + + --savant-frame- + Content-Type: application/x-protobuf + Content-Disposition: inline; name="metadata" + Content-Length: + + + --savant-frame- + Content-Type: application/octet-stream + Content-Disposition: inline; name="data"; index="0" + Content-Length: + + + --savant-frame--- + Create New Job -------------- @@ -787,4 +898,3 @@ Updates the stop condition of the running job matching the given UUID. - 500 Internal Server Error - ``{"error" => "Reason"}`` - problem description - diff --git a/services/replay/replay/Cargo.toml b/services/replay/replay/Cargo.toml index 112c563f..2ff094ba 100644 --- a/services/replay/replay/Cargo.toml +++ b/services/replay/replay/Cargo.toml @@ -18,6 +18,7 @@ anyhow = { workspace = true } env_logger = { workspace = true } log = { workspace = true } replaydb = { workspace = true } +savant_core = { workspace = true } tokio = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/services/replay/replay/src/main.rs b/services/replay/replay/src/main.rs index 61f17fae..8143353e 100644 --- a/services/replay/replay/src/main.rs +++ b/services/replay/replay/src/main.rs @@ -11,6 +11,7 @@ use replaydb::service::JobManager; use crate::web_service::del_job::delete_job; use crate::web_service::find_keyframes::find_keyframes; +use crate::web_service::get_keyframe_by_uuid::get_keyframe_by_uuid; use crate::web_service::list_jobs::{list_job, list_jobs, list_stopped_jobs}; use crate::web_service::new_job::new_job; use crate::web_service::shutdown::shutdown; @@ -51,6 +52,7 @@ async fn main() -> Result<()> { .app_data(http_job_service.clone()) .service(status) .service(shutdown) + .service(get_keyframe_by_uuid) .service(find_keyframes) .service(list_stopped_jobs) .service(list_job) diff --git a/services/replay/replay/src/web_service.rs b/services/replay/replay/src/web_service.rs index d0f67115..9e81e463 100644 --- a/services/replay/replay/src/web_service.rs +++ b/services/replay/replay/src/web_service.rs @@ -10,6 +10,7 @@ use replaydb::service::rocksdb_service::RocksDbService; pub mod del_job; pub mod find_keyframes; +pub mod get_keyframe_by_uuid; pub mod list_jobs; pub mod new_job; pub mod shutdown; diff --git a/services/replay/replay/src/web_service/get_keyframe_by_uuid.rs b/services/replay/replay/src/web_service/get_keyframe_by_uuid.rs new file mode 100644 index 00000000..13bada75 --- /dev/null +++ b/services/replay/replay/src/web_service/get_keyframe_by_uuid.rs @@ -0,0 +1,207 @@ +use actix_web::{get, web, HttpResponse, Responder}; +use log::{debug, warn}; +use savant_core::json_api::ToSerdeJsonValue; +use savant_core::message::Message; +use savant_core::protobuf::serialize as protobuf_serialize; +use serde::Deserialize; +use uuid::Uuid; + +use crate::web_service::JobService; + +#[derive(Debug, Deserialize, Default, Clone, Copy, PartialEq)] +#[serde(rename_all = "lowercase")] +enum MetadataFormat { + Json, + #[default] + Native, +} + +#[derive(Debug, Deserialize)] +struct GetKeyframeQuery { + source_id: String, + #[serde(default)] + metadata_format: MetadataFormat, +} + +fn write_boundary(body: &mut Vec, boundary: &str, is_final: bool) { + body.extend_from_slice(b"--"); + body.extend_from_slice(boundary.as_bytes()); + if is_final { + body.extend_from_slice(b"--"); + } + body.extend_from_slice(b"\r\n"); +} + +fn build_multipart_response( + message: &Message, + data: &[Vec], + metadata_format: MetadataFormat, +) -> Result<(String, Vec), String> { + let boundary = format!("savant-frame-{}", Uuid::new_v4().as_simple()); + let mut body = Vec::new(); + + write_boundary(&mut body, &boundary, false); + + // Serialize metadata based on format + let (content_type, metadata_bytes) = match metadata_format { + MetadataFormat::Json => { + let video_frame = message + .as_video_frame() + .ok_or_else(|| "Message is not a VideoFrame".to_string())?; + let json = video_frame.to_serde_json_value(); + let json_bytes = serde_json::to_vec(&json) + .map_err(|e| format!("JSON serialization failed: {}", e))?; + ("application/json", json_bytes) + } + MetadataFormat::Native => { + let native_bytes = protobuf_serialize(message) + .map_err(|e| format!("Protobuf serialization failed: {}", e))?; + ("application/x-protobuf", native_bytes) + } + }; + + body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes()); + body.extend_from_slice(b"Content-Disposition: inline; name=\"metadata\"\r\n"); + body.extend_from_slice(format!("Content-Length: {}\r\n\r\n", metadata_bytes.len()).as_bytes()); + body.extend_from_slice(&metadata_bytes); + body.extend_from_slice(b"\r\n"); + + for (idx, item) in data.iter().enumerate() { + write_boundary(&mut body, &boundary, false); + body.extend_from_slice(b"Content-Type: application/octet-stream\r\n"); + body.extend_from_slice( + format!( + "Content-Disposition: inline; name=\"data\"; index=\"{}\"\r\n", + idx + ) + .as_bytes(), + ); + body.extend_from_slice(format!("Content-Length: {}\r\n\r\n", item.len()).as_bytes()); + body.extend_from_slice(item); + body.extend_from_slice(b"\r\n"); + } + + write_boundary(&mut body, &boundary, true); + + Ok((boundary, body)) +} + +#[get("/keyframe/{uuid}")] +pub async fn get_keyframe_by_uuid( + js: web::Data, + uuid_path: web::Path, + query: web::Query, +) -> impl Responder { + let uuid_str = uuid_path.into_inner(); + + debug!( + "Keyframe query: uuid={}, source_id={}, metadata_format={:?}", + uuid_str, query.source_id, query.metadata_format + ); + + let uuid = match Uuid::parse_str(&uuid_str) { + Ok(u) => u, + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": "Invalid UUID format", + "details": e.to_string() + })); + } + }; + + if query.source_id.is_empty() { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": "Invalid parameter", + "details": "source_id cannot be empty" + })); + } + + let mut js_bind = js.service.lock().await; + let result = js_bind.get_keyframe_by_uuid(&query.source_id, uuid).await; + + match result { + Ok(Some(frame_data)) => { + debug!( + "Returning keyframe: uuid={}, data={}, metadata_format={:?}", + uuid, + frame_data.data.len(), + query.metadata_format + ); + + match build_multipart_response(&frame_data.message, &frame_data.data, query.metadata_format) { + Ok((boundary, body)) => HttpResponse::Ok() + .content_type(format!("multipart/mixed; boundary={}", boundary)) + .body(body), + Err(e) => { + warn!("Failed to build multipart response: {}", e); + HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to build response", + "details": e + })) + } + } + } + Ok(None) => { + debug!( + "Keyframe not found: uuid={}, source_id={}", + uuid, query.source_id + ); + + HttpResponse::NotFound().json(serde_json::json!({ + "error": format!( + "Keyframe not found: uuid={} for source_id='{}'", + uuid, query.source_id + ) + })) + } + Err(e) => { + warn!( + "Error retrieving keyframe: uuid={}, source_id={}, error={}", + uuid, query.source_id, e + ); + + HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Internal server error", + "details": e.to_string() + })) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use savant_core::protobuf::deserialize as protobuf_deserialize; + use savant_core::test::gen_frame; + + #[test] + fn test_build_multipart_response_native_format() { + let frame = gen_frame(); + let message = frame.to_message(); + let data = vec![vec![1u8, 2, 3, 4], vec![5u8, 6, 7, 8]]; + + let result = build_multipart_response(&message, &data, MetadataFormat::Native); + assert!(result.is_ok()); + + let (boundary, body) = result.unwrap(); + let body_str = String::from_utf8_lossy(&body); + + assert!(boundary.starts_with("savant-frame-")); + assert!(body_str.contains("Content-Type: application/x-protobuf")); + } + + #[test] + fn test_native_format_roundtrip() { + let frame = gen_frame(); + let original_message = frame.to_message(); + + let native_bytes = protobuf_serialize(&original_message).unwrap(); + let restored_message = protobuf_deserialize(&native_bytes).unwrap(); + + assert!(restored_message.is_video_frame()); + let restored_frame = restored_message.as_video_frame().unwrap(); + + assert_eq!(frame.get_uuid(), restored_frame.get_uuid()); + assert_eq!(frame.get_source_id(), restored_frame.get_source_id()); + } +} diff --git a/services/replay/replaydb/src/job.rs b/services/replay/replaydb/src/job.rs index 67403b05..10f11201 100644 --- a/services/replay/replaydb/src/job.rs +++ b/services/replay/replaydb/src/job.rs @@ -644,6 +644,14 @@ mod tests { ) -> Result> { unreachable!("MockStore::find_keyframes") } + + async fn get_keyframe_by_uuid( + &mut self, + _source_id: &str, + _uuid: Uuid, + ) -> Result> { + unreachable!("MockStore::get_keyframe_by_uuid") + } } #[test] diff --git a/services/replay/replaydb/src/service/rocksdb_service.rs b/services/replay/replaydb/src/service/rocksdb_service.rs index 90c1a9df..85d1a86e 100644 --- a/services/replay/replaydb/src/service/rocksdb_service.rs +++ b/services/replay/replaydb/src/service/rocksdb_service.rs @@ -17,7 +17,7 @@ use crate::job::SyncJobStopCondition; use crate::service::configuration::ServiceConfiguration; use crate::service::JobManager; use crate::store::rocksdb::RocksDbStore; -use crate::store::{Store, SyncRocksDbStore}; +use crate::store::{KeyframeRecord, Store, SyncRocksDbStore}; use crate::stream_processor::RocksDbStreamProcessor; #[allow(clippy::type_complexity)] @@ -104,6 +104,15 @@ impl RocksDbService { let mut store = self.store.lock().await; store.find_keyframes(source_id, from, to, limit).await } + + pub async fn get_keyframe_by_uuid( + &mut self, + source_id: &str, + uuid: Uuid, + ) -> Result> { + let mut store = self.store.lock().await; + store.get_keyframe_by_uuid(source_id, uuid).await + } } impl JobManager for RocksDbService { diff --git a/services/replay/replaydb/src/store.rs b/services/replay/replaydb/src/store.rs index 2ac2b7bc..db144980 100644 --- a/services/replay/replaydb/src/store.rs +++ b/services/replay/replaydb/src/store.rs @@ -34,6 +34,13 @@ pub enum JobOffset { Seconds(f64), } +#[derive(Debug, Clone)] +pub struct KeyframeRecord { + pub message: Message, + pub data: Vec>, + pub message_index: usize, +} + pub type SyncRocksDbStore = Arc>; pub(crate) trait Store { @@ -67,6 +74,12 @@ pub(crate) trait Store { to: Option, limit: usize, ) -> Result>; + + async fn get_keyframe_by_uuid( + &mut self, + source_id: &str, + uuid: Uuid, + ) -> Result>; } #[cfg(test)] @@ -172,6 +185,24 @@ mod tests { .map(|(u, _)| *u) .collect()) } + + async fn get_keyframe_by_uuid( + &mut self, + _source_id: &str, + uuid: Uuid, + ) -> Result> { + for (kf_uuid, idx) in &self.keyframes { + if *kf_uuid == uuid { + let message = &self.messages[*idx]; + return Ok(Some(KeyframeRecord { + message: message.clone(), + data: vec![], + message_index: *idx, + })); + } + } + Ok(None) + } } #[tokio::test] diff --git a/services/replay/replaydb/src/store/rocksdb.rs b/services/replay/replaydb/src/store/rocksdb.rs index 34fc601a..82e88a47 100644 --- a/services/replay/replaydb/src/store/rocksdb.rs +++ b/services/replay/replaydb/src/store/rocksdb.rs @@ -3,7 +3,7 @@ use std::path::Path; use std::time::{Duration, SystemTime}; use std::{fs, io}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use bincode::config::{BigEndian, Configuration}; use bincode::{Decode, Encode}; use hashbrown::HashMap; @@ -15,7 +15,7 @@ use uuid::Uuid; use crate::get_keyframe_boundary; use crate::service::configuration::Storage; -use crate::store::JobOffset; +use crate::store::{JobOffset, KeyframeRecord}; const CF_MESSAGE_DB: &str = "message_db"; const CF_KEYFRAME_DB: &str = "keyframes_db"; @@ -432,11 +432,50 @@ impl super::Store for RocksDbStore { } Ok(keyframes) } + + async fn get_keyframe_by_uuid( + &mut self, + source_id: &str, + uuid: Uuid, + ) -> Result> { + let source_hash = self.get_source_hash(source_id)?; + let key = KeyframeKey { + source_md5: source_hash, + keyframe_uuid: uuid.as_u128(), + }; + + let cf = self + .db + .cf_handle(CF_KEYFRAME_DB) + .ok_or_else(|| anyhow!("CF_KEYFRAME_DB not found"))?; + + let key_bytes = bincode::encode_to_vec(key, self.configuration)?; + + let value_bytes = match self.db.get_cf(cf, key_bytes)? { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let keyframe_value: KeyframeValue = + bincode::decode_from_slice(&value_bytes, self.configuration)?.0; + + let (message, _topic, data) = self + .get_message(source_id, keyframe_value.index) + .await? + .ok_or_else(|| anyhow!("Message not found for index {}", keyframe_value.index))?; + + Ok(Some(KeyframeRecord { + message, + data, + message_index: keyframe_value.index, + })) + } } #[cfg(test)] mod tests { use savant_core::test::gen_frame; + use savant_core::utils::uuid_v7::incremental_uuid_v7; use std::ops::Div; use tokio_timerfd::sleep; @@ -625,6 +664,84 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_keyframe_by_uuid_not_found() -> Result<()> { + let conf = create_test_db_config()?; + let path = { + let Storage::RocksDB { path, .. } = &conf; + path.clone() + }; + let mut db = RocksDbStore::new(&conf)?; + + let frame = gen_properly_filled_frame(true); + db.add_message(&frame.to_message(), b"topic", &[vec![0u8; 2]]) + .await?; + + let missing_uuid = incremental_uuid_v7(); + let result = db + .get_keyframe_by_uuid(&frame.get_source_id(), missing_uuid) + .await?; + assert!(result.is_none()); + + let _ = RocksDbStore::remove_db(&path); + Ok(()) + } + + #[tokio::test] + async fn test_get_keyframe_by_uuid_wrong_source() -> Result<()> { + let conf = create_test_db_config()?; + let path = { + let Storage::RocksDB { path, .. } = &conf; + path.clone() + }; + let mut db = RocksDbStore::new(&conf)?; + + let mut frame = gen_properly_filled_frame(true); + frame.set_source_id("source-one"); + let uuid = frame.get_uuid(); + db.add_message(&frame.to_message(), b"topic", &[vec![0u8; 4]]) + .await?; + + let result = db.get_keyframe_by_uuid("source-two", uuid).await?; + assert!(result.is_none()); + + let _ = RocksDbStore::remove_db(&path); + Ok(()) + } + + #[tokio::test] + async fn test_get_keyframe_by_uuid_success() -> Result<()> { + let conf = create_test_db_config()?; + let path = { + let Storage::RocksDB { path, .. } = &conf; + path.clone() + }; + let mut db = RocksDbStore::new(&conf)?; + + let frame = gen_properly_filled_frame(true); + let uuid = frame.get_uuid(); + let source_id = frame.get_source_id(); + let data_items = vec![vec![1u8, 2, 3, 4], vec![5u8, 6, 7, 8]]; + db.add_message(&frame.to_message(), b"topic", &data_items) + .await?; + + let result = db.get_keyframe_by_uuid(&source_id, uuid).await?; + assert!(result.is_some()); + + let keyframe_data = result.unwrap(); + assert!(keyframe_data.message.is_video_frame()); + assert_eq!(keyframe_data.data.len(), 2); + assert_eq!(keyframe_data.data[0], vec![1u8, 2, 3, 4]); + assert_eq!(keyframe_data.data[1], vec![5u8, 6, 7, 8]); + assert_eq!(keyframe_data.message_index, 0); + + let video_frame = keyframe_data.message.as_video_frame().unwrap(); + assert_eq!(video_frame.get_uuid(), uuid); + + let _ = RocksDbStore::remove_db(&path); + Ok(()) + } + #[tokio::test] async fn test_keyframes() -> Result<()> { let conf = create_test_db_config()?;