Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions lib/ex_webrtc_recorder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule ExWebRTC.Recorder do
Can optionally upload the saved files to S3-compatible storage.
See `ExWebRTC.Recorder.S3` and `t:options/0` for more info.
"""

alias ExWebRTC.MediaStreamTrack
alias __MODULE__.S3

Expand Down Expand Up @@ -121,11 +120,12 @@ defmodule ExWebRTC.Recorder do
recorder(),
MediaStreamTrack.id(),
MediaStreamTrack.rid() | nil,
ExWebRTC.RTPCodecParameters.t() | nil,
ExRTP.Packet.t()
) :: :ok
def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do
def record(recorder, track_id, rid, codec, %ExRTP.Packet{} = packet) do
recv_time = System.monotonic_time(:millisecond)
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
GenServer.cast(recorder, {:record, track_id, rid, codec, recv_time, packet})
end

@doc """
Expand Down Expand Up @@ -220,10 +220,12 @@ defmodule ExWebRTC.Recorder do
end

@impl true
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
def handle_cast({:record, track_id, rid, codec, recv_time, packet}, state)
when is_map_key(state.track_data, track_id) do
%{file: file, rid_map: rid_map} = state.track_data[track_id]

state = if codec, do: update_codec(state, track_id, codec), else: state

with {:ok, rid_idx} <- Map.fetch(rid_map, rid),
false <- is_nil(file) do
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
Expand All @@ -243,7 +245,7 @@ defmodule ExWebRTC.Recorder do
end

@impl true
def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do
def handle_cast({:record, track_id, _rid, _codec, _recv_time, _packet}, state) do
Logger.warning("""
Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\
""")
Expand Down Expand Up @@ -290,6 +292,7 @@ defmodule ExWebRTC.Recorder do
start_time: start_time,
kind: track.kind,
streams: track.streams,
codec: nil,
rid_map: (track.rids || [nil]) |> Enum.with_index() |> Map.new(),
location: file_path,
file: File.open!(file_path, [:write])
Expand All @@ -302,7 +305,7 @@ defmodule ExWebRTC.Recorder do

state = %{state | track_data: Map.merge(state.track_data, new_track_data)}

:ok = File.write!(state.manifest_path, state.track_data |> to_manifest() |> Jason.encode!())
:ok = write_manifest(state)

{manifest_diff, state}
end
Expand Down Expand Up @@ -342,6 +345,20 @@ defmodule ExWebRTC.Recorder do
end)
end

defp update_codec(state, track_id, codec) do
case get_in(state, [:track_data, track_id, :codec]) do
nil ->
state = put_in(state, [:track_data, track_id, :codec], codec)

:ok = write_manifest(state)
Logger.info("Updated manifest with codec info for track #{track_id}")
state

_ ->
state
end
end

defp serialize_packet(packet, rid_idx, recv_time) do
packet = ExRTP.Packet.encode(packet)
packet_size = byte_size(packet)
Expand All @@ -355,4 +372,14 @@ defmodule ExWebRTC.Recorder do
:io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s])
|> to_string()
end

defp write_manifest(state) do
File.write!(
state.manifest_path,
state.track_data
|> to_manifest()
|> ExWebRTC.Recorder.Manifest.to_json!()
|> Jason.encode!()
)
end
end
92 changes: 91 additions & 1 deletion lib/ex_webrtc_recorder/manifest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,63 @@ defmodule ExWebRTC.Recorder.Manifest do
kind: :video | :audio,
streams: [MediaStreamTrack.stream_id()],
rid_map: %{MediaStreamTrack.rid() => integer()},
codec: ExWebRTC.RTPCodecParameters.t() | nil,
location: location()
}

@type t :: %{MediaStreamTrack.id() => track_manifest()}

@doc false
@spec to_json!(t()) :: map()
def to_json!(manifest) do
Map.new(manifest, fn {id, entry} ->
{
id,
%{
"start_time" => DateTime.to_iso8601(entry.start_time),
"kind" => Atom.to_string(entry.kind),
"streams" => entry.streams,
"rid_map" => encode_rid_map(entry.rid_map),
"codec" => encode_codec(entry.codec),
"location" => entry.location
}
}
end)
end

defp encode_rid_map(rid_map) do
Map.new(rid_map, fn
{nil, v} -> {"nil", v}
{layer, v} -> {layer, v}
end)
end

defp encode_codec(nil), do: nil

defp encode_codec(%ExWebRTC.RTPCodecParameters{} = codec) do
%{
"payload_type" => codec.payload_type,
"mime_type" => codec.mime_type,
"clock_rate" => codec.clock_rate,
"channels" => codec.channels,
"sdp_fmtp_line" => fmtp_to_string(codec.sdp_fmtp_line),
"rtcp_fbs" => rtcp_fbs_to_strings(codec.rtcp_fbs)
}
end

defp fmtp_to_string([]), do: nil
defp fmtp_to_string(nil), do: nil
defp fmtp_to_string(fmtp), do: fmtp |> to_string() |> String.replace_prefix("fmtp:", "")

defp rtcp_fbs_to_strings(nil), do: nil
defp rtcp_fbs_to_strings([]), do: nil

defp rtcp_fbs_to_strings(list) when is_list(list) do
list
|> Enum.map(&to_string/1)
|> Enum.map(&String.replace_prefix(&1, "rtcp-fb:", ""))
end

@doc false
@spec from_json!(map()) :: t()
def from_json!(json_manifest) do
Expand All @@ -38,14 +90,16 @@ defmodule ExWebRTC.Recorder.Manifest do
"kind" => kind,
"streams" => streams,
"rid_map" => rid_map,
"codec" => codec,
"location" => location
}) do
%{
streams: streams,
location: location,
start_time: parse_start_time(start_time),
rid_map: parse_rid_map(rid_map),
kind: parse_kind(kind)
kind: parse_kind(kind),
codec: parse_codec(codec)
}
end

Expand All @@ -63,4 +117,40 @@ defmodule ExWebRTC.Recorder.Manifest do

defp parse_kind("video"), do: :video
defp parse_kind("audio"), do: :audio

defp parse_codec(%{
"payload_type" => payload_type,
"mime_type" => mime_type,
"clock_rate" => clock_rate,
"channels" => channels,
"sdp_fmtp_line" => sdp_fmtp_line,
"rtcp_fbs" => rtcp_fbs
}) do
%ExWebRTC.RTPCodecParameters{
payload_type: payload_type,
mime_type: mime_type,
clock_rate: clock_rate,
channels: channels,
sdp_fmtp_line: parse_sdp_fmtp_line(sdp_fmtp_line),
rtcp_fbs: parse_rtcp_fbs(rtcp_fbs)
}
end

defp parse_codec(nil), do: nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will be "nil" if read from the serialized JSON.

If you have the time, you could consider writing unit tests for this module -- since it looks like we'll continue extending the Recorder, adding tests will help ensure basic functionality like this keeps working with further changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the value returned by Jason.decode!(json_string) is passed to from_json!() (see converter.ex:134); Jason.decode/1 converts JSON null to Elixir nil, and I added tests to confirm this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my bad. I assumed it behaved the same way as our rid_map, where we do "nil" -> nil ourselves


defp parse_sdp_fmtp_line(sdp_fmtp_line) when is_binary(sdp_fmtp_line) do
{:ok, fmtp} = ExSDP.Attribute.FMTP.parse(sdp_fmtp_line)
fmtp
end

defp parse_sdp_fmtp_line(nil), do: []

defp parse_rtcp_fbs(rtcp_fbs) when is_list(rtcp_fbs) do
Enum.map(rtcp_fbs, fn fb ->
{:ok, rtcp_fb} = ExSDP.Attribute.RTCPFeedback.parse(fb)
rtcp_fb
end)
end

defp parse_rtcp_fbs(nil), do: []
end
Loading
Loading