Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/ex_webrtc_recorder/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ defmodule ExWebRTC.Recorder.S3 do
* `:bucket_name` (required) - Name of bucket objects will be uploaded to.
* `:base_path` - S3 path prefix used for objects uploaded to the bucket. `""` by default.
"""
@type upload_option :: {:bucket_name, String.t()} | {:base_path, String.t()}
@type upload_option ::
{:bucket_name, String.t()} | {:base_path, String.t()} | {:upload_manifest, boolean()}

@type upload_config :: [upload_option() | override_option()]

Expand Down
99 changes: 66 additions & 33 deletions lib/ex_webrtc_recorder/s3/upload_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ if Code.ensure_loaded?(ExAws.S3) do
s3_config_overrides: keyword(),
bucket_name: String.t(),
base_path: Path.t(),
tasks: %{ref() => manifest()}
tasks: %{ref() => manifest()},
upload_manifest: boolean()
}

@enforce_keys [:s3_config_overrides, :bucket_name, :base_path]
@enforce_keys [:s3_config_overrides, :bucket_name, :base_path, :upload_manifest]
defstruct @enforce_keys ++ [tasks: %{}]

@spec new(keyword()) :: t()
Expand All @@ -27,24 +28,32 @@ if Code.ensure_loaded?(ExAws.S3) do

base_path = Keyword.get(config, :base_path, "")
{:ok, _test_path} = base_path |> Path.join("a") |> Recorder.S3.Utils.validate_s3_path()
upload_manifest = Keyword.get(config, :upload_manifest, false)

s3_config_overrides = Keyword.drop(config, [:bucket_name, :base_path])

%__MODULE__{
bucket_name: bucket_name,
base_path: base_path,
upload_manifest: upload_manifest,
s3_config_overrides: s3_config_overrides
}
end

@spec spawn_task(t(), manifest()) :: {ref(), t()}
def spawn_task(
%__MODULE__{bucket_name: bucket_name, s3_config_overrides: s3_config_overrides} =
%__MODULE__{
bucket_name: bucket_name,
base_path: base_path,
upload_manifest: upload_manifest,
s3_config_overrides: s3_config_overrides
} =
handler,
manifest
) do
s3_paths =
Map.new(manifest, fn {id, %{location: path}} ->
s3_path = path |> Path.basename() |> then(&Path.join(handler.base_path, &1))
s3_path = path |> Path.basename() |> then(&Path.join(base_path, &1))

{id, s3_path}
end)
Expand All @@ -56,21 +65,19 @@ if Code.ensure_loaded?(ExAws.S3) do
{id, %{object_data | location: location}}
end)

manifest_s3_path =
Path.join(handler.base_path, "manifest.json")

Recorder.S3.Utils.upload_manifest(
download_manifest,
bucket_name,
manifest_s3_path,
s3_config_overrides
)

# FIXME: this links, ideally we should use `async_nolink` instead
# but this may require a slight change of the current UploadHandler logic
task =
Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn ->
upload(manifest, bucket_name, s3_paths, s3_config_overrides)
upload(
manifest,
bucket_name,
base_path,
s3_paths,
s3_config_overrides,
upload_manifest,
download_manifest
)
end)

{task.ref,
Expand Down Expand Up @@ -105,28 +112,54 @@ if Code.ensure_loaded?(ExAws.S3) do
{result, manifest, %__MODULE__{handler | tasks: tasks}}
end

defp upload(manifest, bucket_name, s3_paths, s3_config_overrides) do
Map.new(manifest, fn {id, %{location: path}} ->
%{^id => s3_path} = s3_paths
Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`")
defp upload(
manifest,
bucket_name,
base_path,
s3_paths,
s3_config_overrides,
upload_manifest,
download_manifest
) do
results =
Map.new(manifest, fn {id, %{location: path}} ->
%{^id => s3_path} = s3_paths
Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`")

result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides)
result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides)

case result do
{:ok, _output} ->
Logger.debug(
"Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`"
)
case result do
{:ok, _output} ->
Logger.debug(
"Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`"
)

{:error, reason} ->
Logger.warning("""
Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \
failed with reason #{inspect(reason)}\
""")
end
{:error, reason} ->
Logger.warning("""
Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \
failed with reason #{inspect(reason)}\
""")
end

{id, result}
end)

{id, result}
end)
if upload_manifest do
manifest_s3_path =
Path.join(base_path, "manifest.json")

manifest_result =
Recorder.S3.Utils.upload_manifest(
download_manifest,
bucket_name,
manifest_s3_path,
s3_config_overrides
)

Map.put(results, :manifest, manifest_result)
else
results
end
end
end
else
Expand Down
Loading