Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion docs/source/services/replay/4_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,79 @@ Finds keyframes in a video stream. Returns found keyframes from oldest to newest
- ``{"error" => "Reason"}``
- problem description

Get Keyframe by UUID
--------------------

Retrieves a specific keyframe by UUID and returns a multipart response containing JSON metadata and binary data parts.

.. code-block:: bash

curl "http://127.0.0.1:8080/api/v1/keyframe/{uuid}?source_id=in-video" \
-o keyframe.multipart

.. list-table::
:header-rows: 1

* - Parameter
- Description
- Extra
- Description
* - Method
- GET
- /api/v1/keyframe/{uuid}
- returns multipart/mixed response
* - ``uuid`` (path)
- string
- required
- Keyframe UUID (obtained from ``find_keyframes``)
* - ``source_id`` (query)
- string
- required
- video source identifier
* - Response OK
- 200 OK
- multipart/mixed
- metadata + data parts
* - Not Found
- 404 Not Found
- JSON
- UUID not found for this source
* - Invalid Request
- 400 Bad Request
- JSON
- invalid UUID format or empty source id
* - Internal Error
- 500 Internal Server Error
- JSON
- database or decoding issue

**Response Format (200 OK)**

The response uses ``multipart/mixed`` format with the following parts in order:

1. **Metadata part** (``Content-Type: application/json``): JSON-serialized video frame metadata
2. **Data parts** (``Content-Type: application/octet-stream``): Zero or more binary data blobs (encoded frame content, external references, etc.)

Each data part includes an ``index`` attribute in the Content-Disposition header indicating its position (0, 1, 2, ...). The semantic meaning of each index is application-specific and depends on the upstream producer.

Example response structure:

.. code-block:: text

--savant-frame-<uuid>
Content-Type: application/json
Content-Disposition: inline; name="metadata"
Content-Length: <length>

{"uuid": "...", "source_id": "...", "pts": ..., ...}
--savant-frame-<uuid>
Content-Type: application/octet-stream
Content-Disposition: inline; name="data"; index="0"
Content-Length: <length>

<binary data>
--savant-frame-<uuid>--

Create New Job
--------------

Expand Down Expand Up @@ -787,4 +860,3 @@ Updates the stop condition of the running job matching the given UUID.
- 500 Internal Server Error
- ``{"error" => "Reason"}``
- problem description

1 change: 1 addition & 0 deletions services/replay/replay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ anyhow = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
replaydb = { workspace = true }
savant_core = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions services/replay/replay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use replaydb::service::JobManager;

use crate::web_service::del_job::delete_job;
use crate::web_service::find_keyframes::find_keyframes;
use crate::web_service::get_keyframe_by_uuid::get_keyframe_by_uuid;
use crate::web_service::list_jobs::{list_job, list_jobs, list_stopped_jobs};
use crate::web_service::new_job::new_job;
use crate::web_service::shutdown::shutdown;
Expand Down Expand Up @@ -51,6 +52,7 @@ async fn main() -> Result<()> {
.app_data(http_job_service.clone())
.service(status)
.service(shutdown)
.service(get_keyframe_by_uuid)
.service(find_keyframes)
.service(list_stopped_jobs)
.service(list_job)
Expand Down
1 change: 1 addition & 0 deletions services/replay/replay/src/web_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use replaydb::service::rocksdb_service::RocksDbService;

pub mod del_job;
pub mod find_keyframes;
pub mod get_keyframe_by_uuid;
pub mod list_jobs;
pub mod new_job;
pub mod shutdown;
Expand Down
144 changes: 144 additions & 0 deletions services/replay/replay/src/web_service/get_keyframe_by_uuid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use actix_web::{get, web, HttpResponse, Responder};
use log::{debug, warn};
use savant_core::json_api::ToSerdeJsonValue;
use savant_core::message::Message;
use serde::Deserialize;
use uuid::Uuid;

use crate::web_service::JobService;

#[derive(Debug, Deserialize)]
struct GetKeyframeQuery {
source_id: String,
}

fn write_boundary(body: &mut Vec<u8>, boundary: &str, is_final: bool) {
body.extend_from_slice(b"--");
body.extend_from_slice(boundary.as_bytes());
if is_final {
body.extend_from_slice(b"--");
}
body.extend_from_slice(b"\r\n");
}

fn build_multipart_response(
message: &Message,
data: &[Vec<u8>],
) -> Result<(String, Vec<u8>), String> {
let boundary = format!("savant-frame-{}", Uuid::new_v4().as_simple());
let mut body = Vec::new();

write_boundary(&mut body, &boundary, false);

let video_frame = message
.as_video_frame()
.ok_or_else(|| "Message is not a VideoFrame".to_string())?;
let json = video_frame.to_serde_json_value();
let json_bytes =
serde_json::to_vec(&json).map_err(|e| format!("JSON serialization failed: {}", e))?;
body.extend_from_slice(b"Content-Type: application/json\r\n");
body.extend_from_slice(b"Content-Disposition: inline; name=\"metadata\"\r\n");
body.extend_from_slice(format!("Content-Length: {}\r\n\r\n", json_bytes.len()).as_bytes());
body.extend_from_slice(&json_bytes);
body.extend_from_slice(b"\r\n");

for (idx, item) in data.iter().enumerate() {
write_boundary(&mut body, &boundary, false);
body.extend_from_slice(b"Content-Type: application/octet-stream\r\n");
body.extend_from_slice(
format!(
"Content-Disposition: inline; name=\"data\"; index=\"{}\"\r\n",
idx
)
.as_bytes(),
);
body.extend_from_slice(format!("Content-Length: {}\r\n\r\n", item.len()).as_bytes());
body.extend_from_slice(item);
body.extend_from_slice(b"\r\n");
}

write_boundary(&mut body, &boundary, true);

Ok((boundary, body))
}

#[get("/keyframe/{uuid}")]
pub async fn get_keyframe_by_uuid(
js: web::Data<JobService>,
uuid_path: web::Path<String>,
query: web::Query<GetKeyframeQuery>,
) -> impl Responder {
let uuid_str = uuid_path.into_inner();

debug!(
"Keyframe query: uuid={}, source_id={}",
uuid_str, query.source_id
);

let uuid = match Uuid::parse_str(&uuid_str) {
Ok(u) => u,
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "Invalid UUID format",
"details": e.to_string()
}));
}
};

if query.source_id.is_empty() {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "Invalid parameter",
"details": "source_id cannot be empty"
}));
}

let mut js_bind = js.service.lock().await;
let result = js_bind.get_keyframe_by_uuid(&query.source_id, uuid).await;

match result {
Ok(Some(frame_data)) => {
debug!(
"Returning keyframe: uuid={}, data={}",
uuid,
frame_data.data.len()
);

match build_multipart_response(&frame_data.message, &frame_data.data) {
Ok((boundary, body)) => HttpResponse::Ok()
.content_type(format!("multipart/mixed; boundary={}", boundary))
.body(body),
Err(e) => {
warn!("Failed to build multipart response: {}", e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to build response",
"details": e
}))
}
}
}
Ok(None) => {
debug!(
"Keyframe not found: uuid={}, source_id={}",
uuid, query.source_id
);

HttpResponse::NotFound().json(serde_json::json!({
"error": format!(
"Keyframe not found: uuid={} for source_id='{}'",
uuid, query.source_id
)
}))
}
Err(e) => {
warn!(
"Error retrieving keyframe: uuid={}, source_id={}, error={}",
uuid, query.source_id, e
);

HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Internal server error",
"details": e.to_string()
}))
}
}
}
8 changes: 8 additions & 0 deletions services/replay/replaydb/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@ mod tests {
) -> Result<Vec<Uuid>> {
unreachable!("MockStore::find_keyframes")
}

async fn get_keyframe_by_uuid(
&mut self,
_source_id: &str,
_uuid: Uuid,
) -> Result<Option<crate::store::KeyframeRecord>> {
unreachable!("MockStore::get_keyframe_by_uuid")
}
}

#[test]
Expand Down
11 changes: 10 additions & 1 deletion services/replay/replaydb/src/service/rocksdb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::job::SyncJobStopCondition;
use crate::service::configuration::ServiceConfiguration;
use crate::service::JobManager;
use crate::store::rocksdb::RocksDbStore;
use crate::store::{Store, SyncRocksDbStore};
use crate::store::{KeyframeRecord, Store, SyncRocksDbStore};
use crate::stream_processor::RocksDbStreamProcessor;

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -104,6 +104,15 @@ impl RocksDbService {
let mut store = self.store.lock().await;
store.find_keyframes(source_id, from, to, limit).await
}

pub async fn get_keyframe_by_uuid(
&mut self,
source_id: &str,
uuid: Uuid,
) -> Result<Option<KeyframeRecord>> {
let mut store = self.store.lock().await;
store.get_keyframe_by_uuid(source_id, uuid).await
}
}

impl JobManager for RocksDbService {
Expand Down
31 changes: 31 additions & 0 deletions services/replay/replaydb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub enum JobOffset {
Seconds(f64),
}

#[derive(Debug, Clone)]
pub struct KeyframeRecord {
pub message: Message,
pub data: Vec<Vec<u8>>,
pub message_index: usize,
}

pub type SyncRocksDbStore = Arc<Mutex<rocksdb::RocksDbStore>>;

pub(crate) trait Store {
Expand Down Expand Up @@ -67,6 +74,12 @@ pub(crate) trait Store {
to: Option<u64>,
limit: usize,
) -> Result<Vec<Uuid>>;

async fn get_keyframe_by_uuid(
&mut self,
source_id: &str,
uuid: Uuid,
) -> Result<Option<KeyframeRecord>>;
}

#[cfg(test)]
Expand Down Expand Up @@ -172,6 +185,24 @@ mod tests {
.map(|(u, _)| *u)
.collect())
}

async fn get_keyframe_by_uuid(
&mut self,
_source_id: &str,
uuid: Uuid,
) -> Result<Option<KeyframeRecord>> {
for (kf_uuid, idx) in &self.keyframes {
if *kf_uuid == uuid {
let message = &self.messages[*idx];
return Ok(Some(KeyframeRecord {
message: message.clone(),
data: vec![],
message_index: *idx,
}));
}
}
Ok(None)
}
}

#[tokio::test]
Expand Down
Loading