From b37c951b4ebc7d6b2fa630cd884d34ec4cc001f9 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Thu, 7 Aug 2025 14:27:46 +0200 Subject: [PATCH 1/7] upload manifest to s3 --- lib/ex_webrtc_recorder.ex | 3 ++- lib/ex_webrtc_recorder/s3/upload_handler.ex | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/ex_webrtc_recorder.ex b/lib/ex_webrtc_recorder.ex index 057535f..da101e4 100644 --- a/lib/ex_webrtc_recorder.ex +++ b/lib/ex_webrtc_recorder.ex @@ -320,13 +320,14 @@ defmodule ExWebRTC.Recorder do end) manifest_diff = to_manifest(state.track_data, track_ids) + files_to_upload = Map.put(manifest_diff, "manifest_file", %{location: state.manifest_path}) case state.upload_handler do nil -> {manifest_diff, nil, state} handler -> - {ref, handler} = S3.UploadHandler.spawn_task(handler, manifest_diff) + {ref, handler} = S3.UploadHandler.spawn_task(handler, files_to_upload) {manifest_diff, ref, %{state | upload_handler: handler}} end diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index 807bfb3..8e548fd 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -40,17 +40,17 @@ if Code.ensure_loaded?(ExAws.S3) do def spawn_task( %__MODULE__{bucket_name: bucket_name, s3_config_overrides: s3_config_overrides} = handler, - manifest + files_to_upload ) do s3_paths = - Map.new(manifest, fn {id, %{location: path}} -> + Map.new(files_to_upload, fn {id, %{location: path}} -> s3_path = path |> Path.basename() |> then(&Path.join(handler.base_path, &1)) {id, s3_path} end) download_manifest = - Map.new(manifest, fn {id, object_data} -> + Map.new(files_to_upload, fn {id, object_data} -> {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) {id, %{object_data | location: location}} @@ -60,7 +60,7 @@ if Code.ensure_loaded?(ExAws.S3) do # but this may require a slight change of the current UploadHandler logic task = Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn -> - upload(manifest, bucket_name, s3_paths, s3_config_overrides) + upload(files_to_upload, bucket_name, s3_paths, s3_config_overrides) end) {task.ref, @@ -95,8 +95,8 @@ if Code.ensure_loaded?(ExAws.S3) do {result, manifest, %__MODULE__{handler | tasks: tasks}} end - defp upload(manifest, bucket_name, s3_paths, s3_config_overrides) do - Map.new(manifest, fn {id, %{location: path}} -> + defp upload(files_to_upload, bucket_name, s3_paths, s3_config_overrides) do + Map.new(files_to_upload, fn {id, %{location: path}} -> %{^id => s3_path} = s3_paths Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`") From 04eb3128850958983c3d8fd86d709fe31739e81d Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Thu, 7 Aug 2025 17:39:51 +0200 Subject: [PATCH 2/7] upload manifest with correct s3 paths --- lib/ex_webrtc_recorder/s3/upload_handler.ex | 25 +++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index 8e548fd..78498a8 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -49,13 +49,7 @@ if Code.ensure_loaded?(ExAws.S3) do {id, s3_path} end) - download_manifest = - Map.new(files_to_upload, fn {id, object_data} -> - {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) - - {id, %{object_data | location: location}} - end) - + download_manifest = prepare_download_manifest(files_to_upload, bucket_name, s3_paths) # FIXME: this links, ideally we should use `async_nolink` instead # but this may require a slight change of the current UploadHandler logic task = @@ -118,6 +112,23 @@ if Code.ensure_loaded?(ExAws.S3) do {id, result} end) end + + defp prepare_download_manifest(files_to_upload, bucket_name, s3_paths) do + {manifest_file, track_files} = Map.pop(files_to_upload, "manifest_file") + + download_manifest = + Map.new(track_files, fn {id, object_data} -> + {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) + {id, %{object_data | location: location}} + end) + + # Update the local manifest file to contain S3 URLs instead of local paths + if manifest_file do + :ok = File.write!(manifest_file.location, Jason.encode!(download_manifest)) + end + + download_manifest + end end else defmodule ExWebRTC.Recorder.S3.UploadHandler do From b9f761fb1ceb8fb6d7e92a5b9a5b4f45738e9744 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Mon, 11 Aug 2025 14:55:14 +0200 Subject: [PATCH 3/7] Revert "upload manifest with correct s3 paths" This reverts commit 04eb3128850958983c3d8fd86d709fe31739e81d. --- lib/ex_webrtc_recorder/s3/upload_handler.ex | 25 ++++++--------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index 78498a8..8e548fd 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -49,7 +49,13 @@ if Code.ensure_loaded?(ExAws.S3) do {id, s3_path} end) - download_manifest = prepare_download_manifest(files_to_upload, bucket_name, s3_paths) + download_manifest = + Map.new(files_to_upload, fn {id, object_data} -> + {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) + + {id, %{object_data | location: location}} + end) + # FIXME: this links, ideally we should use `async_nolink` instead # but this may require a slight change of the current UploadHandler logic task = @@ -112,23 +118,6 @@ if Code.ensure_loaded?(ExAws.S3) do {id, result} end) end - - defp prepare_download_manifest(files_to_upload, bucket_name, s3_paths) do - {manifest_file, track_files} = Map.pop(files_to_upload, "manifest_file") - - download_manifest = - Map.new(track_files, fn {id, object_data} -> - {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) - {id, %{object_data | location: location}} - end) - - # Update the local manifest file to contain S3 URLs instead of local paths - if manifest_file do - :ok = File.write!(manifest_file.location, Jason.encode!(download_manifest)) - end - - download_manifest - end end else defmodule ExWebRTC.Recorder.S3.UploadHandler do From e3c6e4e44451a0dff4bbe646810d7c871fa60643 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Mon, 11 Aug 2025 16:14:12 +0200 Subject: [PATCH 4/7] Apply comment suggestion --- lib/ex_webrtc_recorder.ex | 3 +-- lib/ex_webrtc_recorder/s3/upload_handler.ex | 22 +++++++++++++++------ lib/ex_webrtc_recorder/s3/utils.ex | 13 ++++++++++++ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/lib/ex_webrtc_recorder.ex b/lib/ex_webrtc_recorder.ex index da101e4..057535f 100644 --- a/lib/ex_webrtc_recorder.ex +++ b/lib/ex_webrtc_recorder.ex @@ -320,14 +320,13 @@ defmodule ExWebRTC.Recorder do end) manifest_diff = to_manifest(state.track_data, track_ids) - files_to_upload = Map.put(manifest_diff, "manifest_file", %{location: state.manifest_path}) case state.upload_handler do nil -> {manifest_diff, nil, state} handler -> - {ref, handler} = S3.UploadHandler.spawn_task(handler, files_to_upload) + {ref, handler} = S3.UploadHandler.spawn_task(handler, manifest_diff) {manifest_diff, ref, %{state | upload_handler: handler}} end diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index 8e548fd..fbfe383 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -40,27 +40,37 @@ if Code.ensure_loaded?(ExAws.S3) do def spawn_task( %__MODULE__{bucket_name: bucket_name, s3_config_overrides: s3_config_overrides} = handler, - files_to_upload + manifest ) do s3_paths = - Map.new(files_to_upload, fn {id, %{location: path}} -> + Map.new(manifest, fn {id, %{location: path}} -> s3_path = path |> Path.basename() |> then(&Path.join(handler.base_path, &1)) {id, s3_path} end) download_manifest = - Map.new(files_to_upload, fn {id, object_data} -> + Map.new(manifest, fn {id, object_data} -> {:ok, location} = Recorder.S3.Utils.to_url(bucket_name, s3_paths[id]) {id, %{object_data | location: location}} end) + manifest_s3_path = + Path.join(handler.base_path, "manifest.json") + + Recorder.S3.Utils.upload_manifest( + download_manifest, + bucket_name, + manifest_s3_path, + s3_config_overrides + ) + # FIXME: this links, ideally we should use `async_nolink` instead # but this may require a slight change of the current UploadHandler logic task = Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn -> - upload(files_to_upload, bucket_name, s3_paths, s3_config_overrides) + upload(manifest, bucket_name, s3_paths, s3_config_overrides) end) {task.ref, @@ -95,8 +105,8 @@ if Code.ensure_loaded?(ExAws.S3) do {result, manifest, %__MODULE__{handler | tasks: tasks}} end - defp upload(files_to_upload, bucket_name, s3_paths, s3_config_overrides) do - Map.new(files_to_upload, fn {id, %{location: path}} -> + defp upload(manifest, bucket_name, s3_paths, s3_config_overrides) do + Map.new(manifest, fn {id, %{location: path}} -> %{^id => s3_path} = s3_paths Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`") diff --git a/lib/ex_webrtc_recorder/s3/utils.ex b/lib/ex_webrtc_recorder/s3/utils.ex index 04a16a4..413d03e 100644 --- a/lib/ex_webrtc_recorder/s3/utils.ex +++ b/lib/ex_webrtc_recorder/s3/utils.ex @@ -10,6 +10,18 @@ if Code.ensure_loaded?(ExAws.S3) do |> ExAws.request(s3_config) end + @chunk_size 5 * 1024 * 1024 + + @spec upload_manifest(Manifest.t(), String.t(), String.t(), keyword()) :: + {:ok | :error, term()} + def upload_manifest(manifest, s3_bucket_name, s3_path, s3_config \\ []) do + manifest + |> Jason.encode!() + |> ExWebRTC.Utils.chunk(@chunk_size) + |> ExAws.S3.upload(s3_bucket_name, s3_path) + |> ExAws.request(s3_config) + end + @spec fetch_file(String.t(), String.t(), Path.t(), keyword()) :: {:ok | :error, term()} def fetch_file(s3_bucket_name, s3_path, output_path, s3_config \\ []) do ExAws.S3.download_file(s3_bucket_name, s3_path, output_path) @@ -67,6 +79,7 @@ else @moduledoc false def upload_file(_, _, _, _ \\ nil), do: error() + def upload_manifest(_, _, _, _ \\ nil), do: error() def fetch_file(_, _, _, _ \\ nil), do: error() def to_url(_, _), do: error() def parse_url(_), do: error() From c0f0530f3b82d7d731de821dbf6c6a65339f58f3 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Mon, 11 Aug 2025 16:21:22 +0200 Subject: [PATCH 5/7] Bump exwebrtc version --- lib/ex_webrtc_recorder/s3/utils.ex | 2 +- mix.exs | 2 +- mix.lock | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/ex_webrtc_recorder/s3/utils.ex b/lib/ex_webrtc_recorder/s3/utils.ex index 413d03e..23219b7 100644 --- a/lib/ex_webrtc_recorder/s3/utils.ex +++ b/lib/ex_webrtc_recorder/s3/utils.ex @@ -12,7 +12,7 @@ if Code.ensure_loaded?(ExAws.S3) do @chunk_size 5 * 1024 * 1024 - @spec upload_manifest(Manifest.t(), String.t(), String.t(), keyword()) :: + @spec upload_manifest(ExWebRTC.Recorder.Manifest.t(), String.t(), String.t(), keyword()) :: {:ok | :error, term()} def upload_manifest(manifest, s3_bucket_name, s3_path, s3_config \\ []) do manifest diff --git a/mix.exs b/mix.exs index 320e31e..8843b7e 100644 --- a/mix.exs +++ b/mix.exs @@ -56,7 +56,7 @@ defmodule ExWebRTC.Recorder.MixProject do defp deps do [ - {:ex_webrtc, "~> 0.14.0"}, + {:ex_webrtc, "~> 0.15.0"}, {:jason, "~> 1.4"}, {:membrane_core, "~> 1.2"}, {:membrane_rtp_plugin, "~> 0.31.0"}, diff --git a/mix.lock b/mix.lock index efe8a92..cb9489a 100644 --- a/mix.lock +++ b/mix.lock @@ -15,18 +15,18 @@ "ex_aws": {:hex, :ex_aws, "2.5.10", "d3f8ca8959dad6533a2a934dfdf380df1b1bef425feeb215a47a5176dee8736c", [:mix], [{:configparser_ex, "~> 5.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "88fcd9cc1b2e0fcea65106bdaa8340ac56c6e29bf72f46cf7ef174027532d3da"}, "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.7", "e571424d2f345299753382f3a01b005c422b1a460a8bc3ed47659b3d3ef91e9e", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "858e51241e50181e29aa2bc128fef548873a3a9cd580471f57eda5b64dec937f"}, "ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"}, - "ex_dtls": {:hex, :ex_dtls, "0.17.0", "dbe1d494583a307c26148cb5ea5d7c14e65daa8ec96cc73002cc3313ce4b9a81", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "3eaa7221ec08fa9e4bc9430e426cbd5eb4feb8d8f450b203cf39b2114a94d713"}, - "ex_ice": {:hex, :ex_ice, "0.12.0", "b52ec3ff878d5fb632ef9facc7657dfdf59e2ff9f23e634b0918e6ce1a05af48", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "a86024a5fbf9431082784be4bb3606d3cde9218fb325a9f208ccd6e0abfd0d73"}, + "ex_dtls": {:hex, :ex_dtls, "0.18.0", "0815e3384bb0c1e6c06559012479cf9a94a501ddf46c3df54dc2d1b169e29d5c", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "562eda1815eeaed8360b2b5c34d4db5b453794bc096404a4c64f193fa7b18bf2"}, + "ex_ice": {:hex, :ex_ice, "0.13.0", "13a6ae106b26bb5f2957a586bf20d4031299e5b968533828e637bb4ac7645d31", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "0d65afa15e36b5610d0f51e72e4c25b22346caa9a6d7d2f6f1cfd8db94bd494e"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, "ex_sdp": {:hex, :ex_sdp, "1.1.1", "1a7b049491e5ec02dad9251c53d960835dc5631321ae978ec331831f3e4f6d5f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "1b13a72ac9c5c695b8824dbdffc671be8cbb4c0d1ccb4ff76a04a6826759f233"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.2.0", "4e1f9b089e9a5ee44928d12370cc9ea7a89b84b2f6256832de65271212eb80de", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "08e884f0af2c4a147e3f8cd4ffe33e3452a256389f0956e55a8c4d75bf0e74cd"}, - "ex_webrtc": {:hex, :ex_webrtc, "0.14.0", "47d3d100fb2294ac82e06269335b702a3f5c5c55cae90444c38e7b78ed6ee79d", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.17.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.12.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sctp, "0.1.2", [hex: :ex_sctp, repo: "hexpm", optional: true]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "280d66c8fedefd78e1a38b734fb25e1fd7ddf4f2daa897ee5afd11aa20d311f8"}, + "ex_webrtc": {:hex, :ex_webrtc, "0.15.0", "c5849edcf7d035fcecf01db5be6d33a9d111999640bfc9d13a8c24e8eab7cced", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.18.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.13.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sctp, "0.1.2", [hex: :ex_sctp, repo: "hexpm", optional: true]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "79c21017b45a464c513f87e64ae9a20c8085d937fb5e0d639c50a8c41018172d"}, "excoveralls": {:hex, :excoveralls, "0.18.5", "e229d0a65982613332ec30f07940038fe451a2e5b29bce2a5022165f0c9b157e", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "523fe8a15603f86d64852aab2abe8ddbd78e68579c8525ae765facc5eae01562"}, "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, - "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, + "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, @@ -63,7 +63,7 @@ "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, - "req": {:hex, :req, "0.5.10", "a3a063eab8b7510785a467f03d30a8d95f66f5c3d9495be3474b61459c54376c", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "8a604815743f8a2d3b5de0659fa3137fa4b1cffd636ecb69b30b2b9b2c2559be"}, + "req": {:hex, :req, "0.5.15", "662020efb6ea60b9f0e0fac9be88cd7558b53fe51155a2d9899de594f9906ba9", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "a6513a35fad65467893ced9785457e91693352c70b58bbc045b47e5eb2ef0c53"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, "sweet_xml": {:hex, :sweet_xml, "0.7.5", "803a563113981aaac202a1dbd39771562d0ad31004ddbfc9b5090bdcd5605277", [:mix], [], "hexpm", "193b28a9b12891cae351d81a0cead165ffe67df1b73fe5866d10629f4faefb12"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, From c733abfbd1f352caf793066636d03f5981d48d84 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Tue, 12 Aug 2025 14:31:46 +0200 Subject: [PATCH 6/7] =?UTF-8?q?Add=20a=20manifest=20to=20files=20that=20ar?= =?UTF-8?q?e=20checked=20to=20confirm=20they=E2=80=99ve=20been=20uploaded?= =?UTF-8?q?=20successfully.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/ex_webrtc_recorder/s3.ex | 3 +- lib/ex_webrtc_recorder/s3/upload_handler.ex | 99 ++++++++++++++------- 2 files changed, 68 insertions(+), 34 deletions(-) diff --git a/lib/ex_webrtc_recorder/s3.ex b/lib/ex_webrtc_recorder/s3.ex index a5ebda8..6b3a16f 100644 --- a/lib/ex_webrtc_recorder/s3.ex +++ b/lib/ex_webrtc_recorder/s3.ex @@ -25,7 +25,8 @@ defmodule ExWebRTC.Recorder.S3 do * `:bucket_name` (required) - Name of bucket objects will be uploaded to. * `:base_path` - S3 path prefix used for objects uploaded to the bucket. `""` by default. """ - @type upload_option :: {:bucket_name, String.t()} | {:base_path, String.t()} + @type upload_option :: + {:bucket_name, String.t()} | {:base_path, String.t()} | {:upload_manifest, boolean()} @type upload_config :: [upload_option() | override_option()] diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index fbfe383..71b8e72 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -14,10 +14,11 @@ if Code.ensure_loaded?(ExAws.S3) do s3_config_overrides: keyword(), bucket_name: String.t(), base_path: Path.t(), - tasks: %{ref() => manifest()} + tasks: %{ref() => manifest()}, + upload_manifest: boolean() } - @enforce_keys [:s3_config_overrides, :bucket_name, :base_path] + @enforce_keys [:s3_config_overrides, :bucket_name, :base_path, :upload_manifest] defstruct @enforce_keys ++ [tasks: %{}] @spec new(keyword()) :: t() @@ -27,24 +28,32 @@ if Code.ensure_loaded?(ExAws.S3) do base_path = Keyword.get(config, :base_path, "") {:ok, _test_path} = base_path |> Path.join("a") |> Recorder.S3.Utils.validate_s3_path() + upload_manifest = Keyword.get(config, :upload_manifest, false) + s3_config_overrides = Keyword.drop(config, [:bucket_name, :base_path]) %__MODULE__{ bucket_name: bucket_name, base_path: base_path, + upload_manifest: upload_manifest, s3_config_overrides: s3_config_overrides } end @spec spawn_task(t(), manifest()) :: {ref(), t()} def spawn_task( - %__MODULE__{bucket_name: bucket_name, s3_config_overrides: s3_config_overrides} = + %__MODULE__{ + bucket_name: bucket_name, + base_path: base_path, + upload_manifest: upload_manifest, + s3_config_overrides: s3_config_overrides + } = handler, manifest ) do s3_paths = Map.new(manifest, fn {id, %{location: path}} -> - s3_path = path |> Path.basename() |> then(&Path.join(handler.base_path, &1)) + s3_path = path |> Path.basename() |> then(&Path.join(base_path, &1)) {id, s3_path} end) @@ -56,21 +65,19 @@ if Code.ensure_loaded?(ExAws.S3) do {id, %{object_data | location: location}} end) - manifest_s3_path = - Path.join(handler.base_path, "manifest.json") - - Recorder.S3.Utils.upload_manifest( - download_manifest, - bucket_name, - manifest_s3_path, - s3_config_overrides - ) - # FIXME: this links, ideally we should use `async_nolink` instead # but this may require a slight change of the current UploadHandler logic task = Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn -> - upload(manifest, bucket_name, s3_paths, s3_config_overrides) + upload( + manifest, + bucket_name, + base_path, + s3_paths, + s3_config_overrides, + upload_manifest, + download_manifest + ) end) {task.ref, @@ -105,28 +112,54 @@ if Code.ensure_loaded?(ExAws.S3) do {result, manifest, %__MODULE__{handler | tasks: tasks}} end - defp upload(manifest, bucket_name, s3_paths, s3_config_overrides) do - Map.new(manifest, fn {id, %{location: path}} -> - %{^id => s3_path} = s3_paths - Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`") + defp upload( + manifest, + bucket_name, + base_path, + s3_paths, + s3_config_overrides, + upload_manifest, + download_manifest + ) do + results = + Map.new(manifest, fn {id, %{location: path}} -> + %{^id => s3_path} = s3_paths + Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`") - result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides) + result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides) - case result do - {:ok, _output} -> - Logger.debug( - "Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`" - ) + case result do + {:ok, _output} -> + Logger.debug( + "Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`" + ) - {:error, reason} -> - Logger.warning(""" - Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \ - failed with reason #{inspect(reason)}\ - """) - end + {:error, reason} -> + Logger.warning(""" + Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \ + failed with reason #{inspect(reason)}\ + """) + end + + {id, result} + end) - {id, result} - end) + if upload_manifest do + manifest_s3_path = + Path.join(base_path, "manifest.json") + + manifest_result = + Recorder.S3.Utils.upload_manifest( + download_manifest, + bucket_name, + manifest_s3_path, + s3_config_overrides + ) + + Map.put(results, :manifest, manifest_result) + else + results + end end end else From 0edf793a93f087d65dc13e7ae4bbb56bcaac4098 Mon Sep 17 00:00:00 2001 From: Bernard Gawor Date: Tue, 12 Aug 2025 17:31:42 +0200 Subject: [PATCH 7/7] apply comment suggestions --- lib/ex_webrtc_recorder/s3.ex | 1 + lib/ex_webrtc_recorder/s3/upload_handler.ex | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/ex_webrtc_recorder/s3.ex b/lib/ex_webrtc_recorder/s3.ex index 6b3a16f..39fe026 100644 --- a/lib/ex_webrtc_recorder/s3.ex +++ b/lib/ex_webrtc_recorder/s3.ex @@ -24,6 +24,7 @@ defmodule ExWebRTC.Recorder.S3 do * `:bucket_name` (required) - Name of bucket objects will be uploaded to. * `:base_path` - S3 path prefix used for objects uploaded to the bucket. `""` by default. + * `:upload_manifest` - Whether to upload the manifest file. Defaults to `true`. """ @type upload_option :: {:bucket_name, String.t()} | {:base_path, String.t()} | {:upload_manifest, boolean()} diff --git a/lib/ex_webrtc_recorder/s3/upload_handler.ex b/lib/ex_webrtc_recorder/s3/upload_handler.ex index 71b8e72..7ede937 100644 --- a/lib/ex_webrtc_recorder/s3/upload_handler.ex +++ b/lib/ex_webrtc_recorder/s3/upload_handler.ex @@ -15,10 +15,10 @@ if Code.ensure_loaded?(ExAws.S3) do bucket_name: String.t(), base_path: Path.t(), tasks: %{ref() => manifest()}, - upload_manifest: boolean() + upload_manifest?: boolean() } - @enforce_keys [:s3_config_overrides, :bucket_name, :base_path, :upload_manifest] + @enforce_keys [:s3_config_overrides, :bucket_name, :base_path, :upload_manifest?] defstruct @enforce_keys ++ [tasks: %{}] @spec new(keyword()) :: t() @@ -28,14 +28,14 @@ if Code.ensure_loaded?(ExAws.S3) do base_path = Keyword.get(config, :base_path, "") {:ok, _test_path} = base_path |> Path.join("a") |> Recorder.S3.Utils.validate_s3_path() - upload_manifest = Keyword.get(config, :upload_manifest, false) + upload_manifest? = Keyword.get(config, :upload_manifest, true) s3_config_overrides = Keyword.drop(config, [:bucket_name, :base_path]) %__MODULE__{ bucket_name: bucket_name, base_path: base_path, - upload_manifest: upload_manifest, + upload_manifest?: upload_manifest?, s3_config_overrides: s3_config_overrides } end @@ -45,7 +45,7 @@ if Code.ensure_loaded?(ExAws.S3) do %__MODULE__{ bucket_name: bucket_name, base_path: base_path, - upload_manifest: upload_manifest, + upload_manifest?: upload_manifest?, s3_config_overrides: s3_config_overrides } = handler, @@ -71,12 +71,12 @@ if Code.ensure_loaded?(ExAws.S3) do Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn -> upload( manifest, + download_manifest, bucket_name, base_path, s3_paths, s3_config_overrides, - upload_manifest, - download_manifest + upload_manifest? ) end) @@ -114,12 +114,12 @@ if Code.ensure_loaded?(ExAws.S3) do defp upload( manifest, + download_manifest, bucket_name, base_path, s3_paths, s3_config_overrides, - upload_manifest, - download_manifest + upload_manifest? ) do results = Map.new(manifest, fn {id, %{location: path}} -> @@ -144,7 +144,7 @@ if Code.ensure_loaded?(ExAws.S3) do {id, result} end) - if upload_manifest do + if upload_manifest? do manifest_s3_path = Path.join(base_path, "manifest.json")