Skip to content

Commit 349dd03

Browse files
committed
wip
1 parent cdae334 commit 349dd03

File tree

5 files changed

+330
-10
lines changed

5 files changed

+330
-10
lines changed
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
defmodule ExWebRTC.RTP.Depayloader.AV1 do
2+
@moduledoc false
3+
# Reassembles AV1 video temporal units from RTP packets.
4+
#
5+
# Resources:
6+
# * [RTP Payload Format for AV1 (av1-rtp-spec)](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html)
7+
# * [AV1 spec](https://aomediacodec.github.io/av1-spec/av1-spec.pdf).
8+
# * https://norkin.org/research/av1_decoder_model/index.html
9+
# * https://chromium.googlesource.com/external/webrtc/+/HEAD/modules/rtp_rtcp/source/video_rtp_depacketizer_av1.cc
10+
11+
@behaviour ExWebRTC.RTP.Depayloader.Behaviour
12+
13+
require Logger
14+
15+
alias ExWebRTC.RTP.AV1.{OBU, Payload}
16+
17+
@temporal_delimiter %OBU{
18+
type: 2,
19+
x: 0,
20+
s: 1,
21+
payload: <<>>
22+
}
23+
|> OBU.serialize()
24+
25+
@type t :: %__MODULE__{
26+
current_temporal_unit: [],
27+
current_obu: nil,
28+
current_timestamp: nil
29+
}
30+
31+
defstruct [:current_temporal_unit, :current_obu, :current_timestamp]
32+
33+
# XXX where warnings, where debugs?
34+
# XXX MUST ignore TD OBU, Tile List OBU
35+
@impl true
36+
def new() do
37+
%__MODULE__{}
38+
end
39+
40+
@impl true
41+
def depayload(depayloader, packet)
42+
43+
def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader}
44+
45+
def depayload(depayloader, packet) do
46+
case Payload.parse(packet.payload) do
47+
{:ok, av1_payload} ->
48+
do_depayload2(depayloader, packet, av1_payload)
49+
50+
{:error, reason} ->
51+
Logger.warning("""
52+
Couldn't parse payload, reason: #{reason}. \
53+
Resetting depayloader state. Payload: #{inspect(packet.payload)}.\
54+
""")
55+
56+
{:ok, %{depayloader | current_temporal_unit: nil, current_timestamp: nil}}
57+
end
58+
end
59+
60+
defp parse_obu_elements(obu_elements, z, y)
61+
62+
defp parse_obu_elements([], _, _) do
63+
Logger.debug("AV1 payload contains no valid OBU elements. Dropping packet.")
64+
{[], nil, nil}
65+
end
66+
67+
defp parse_obu_elements(obus, 0, 0) do
68+
{obus, nil, nil}
69+
end
70+
71+
# Last OBU element is an OBU fragment that will be continued
72+
defp parse_obu_elements(obu_elements, 0, 1) do
73+
{next_obu_fragment, obus} = List.pop_at(obu_elements, -1)
74+
{obus, nil, next_obu_fragment}
75+
end
76+
77+
# First OBU element is an OBU fragment, a continuation of the current OBU
78+
defp parse_obu_elements([current_obu_fragment | obus], 1, 0) do
79+
{obus, current_obu_fragment, nil}
80+
end
81+
82+
# Both. If packet contained exactly 1 OBU fragment, we store it as current_obu_fragment only
83+
defp parse_obu_elements([current_obu_fragment | obu_elements], 1, 1) do
84+
{next_obu_fragment, obus} = List.pop_at(obu_elements, -1)
85+
{obus, current_obu_fragment, next_obu_fragment}
86+
end
87+
88+
defp update_current_obu(depayloader, current_obu_fragment, new_frame?)
89+
90+
defp update_current_obu(depayloader, current_obu_fragment, true) do
91+
if depayloader.current_obu != nil do
92+
Logger.debug(
93+
"Received packet with timestamp from a new temporal unit without finishing the previous OBU. Dropping previous OBU."
94+
)
95+
end
96+
97+
if current_obu_fragment != nil do
98+
Logger.debug("Received ")
99+
end
100+
101+
%{depayloader | current_obu: nil}
102+
end
103+
104+
defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, nil, false) do
105+
depayloader
106+
end
107+
108+
defp update_current_obu(depayloader, nil, false) do
109+
Logger.debug(
110+
"Received start of new OBU without finishing the previous OBU. Dropping previous OBU."
111+
)
112+
113+
%{depayloader | current_obu: nil}
114+
end
115+
116+
defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, _obu_fragment, false) do
117+
Logger.debug(
118+
"Received middle OBU fragment without beginning the OBU. Dropping this OBU fragment."
119+
)
120+
121+
depayloader
122+
end
123+
124+
defp update_current_obu(%__MODULE__{current_obu: obu} = depayloader, obu_fragment, false) do
125+
%{depayloader | current_obu: obu <> obu_fragment}
126+
end
127+
128+
# current_obu is nil, nothing to flush
129+
defp maybe_flush_current_obu(
130+
%__MODULE__{current_obu: nil} = depayloader,
131+
obus,
132+
next_obu_fragment,
133+
_y
134+
) do
135+
{depayloader, obus, next_obu_fragment}
136+
end
137+
138+
# Packet contained exactly 1 OBU fragment, current_obu will be continued. Do not flush
139+
# XXX check the nil
140+
defp maybe_flush_current_obu(%__MODULE__{current_obu: incomplete_obu} = depayloader, [], nil, 1) do
141+
{depayloader, [], incomplete_obu}
142+
end
143+
144+
# Otherwise, flush
145+
defp maybe_flush_current_obu(
146+
%__MODULE__{current_obu: obu} = depayloader,
147+
obus,
148+
next_obu_fragment,
149+
_y
150+
) do
151+
{depayloader, [obu | obus], next_obu_fragment}
152+
end
153+
154+
defp update_temporal_unit(
155+
%__MODULE__{current_temporal_unit: tu} = depayloader,
156+
obus,
157+
next_obu_fragment,
158+
timestamp
159+
) do
160+
%{
161+
depayloader
162+
| current_obu: next_obu_fragment,
163+
current_temporal_unit: append_obus(obus, tu),
164+
current_timestamp: timestamp
165+
}
166+
end
167+
168+
# XXX UWAGA NA TIMESTAMPA ŚMIERDZIELA
169+
defp flush_temporal_unit(%__MODULE__{current_temporal_unit: tu} = depayloader) when tu != [] do
170+
# Force s=1 for the low overhead bitstring format
171+
tu_binary =
172+
tu
173+
|> Stream.map(&%OBU{&1 | s: 1})
174+
|> Stream.map(&OBU.serialize/1)
175+
|> Enum.reverse()
176+
|> :erlang.iolist_to_binary()
177+
178+
# if current obu not nil, log
179+
{@temporal_delimiter <> tu_binary,
180+
%{depayloader | current_temporal_unit: [], current_obu: nil, current_timestamp: nil}}
181+
end
182+
183+
defp flush_temporal_unit(depayloader) do
184+
Logger.debug("WRITEME")
185+
# XXX maybe zero?
186+
{nil, depayloader}
187+
end
188+
189+
defp do_depayload2(depayloader, packet, %Payload{z: z, y: y} = av1_payload) do
190+
{obus, current_obu_fragment, next_obu_fragment} =
191+
av1_payload
192+
|> Payload.depayload_obu_elements()
193+
|> parse_obu_elements(z, y)
194+
195+
# {[A], 0, 0} -> {[A], nil, nil}
196+
# {[A], 0, 1} -> {[], nil, A}
197+
# {[A], 1, 0} -> {[], A, nil}
198+
# {[A], 1, 1} -> {[], A, nil}
199+
#
200+
# {[B, C, D], 0, 0} -> {[B, C, D], nil, nil}
201+
# {[B, C, D], 0, 1} -> {[B, C], nil, D}
202+
# {[B, C, D], 1, 0} -> {[C, D], B, nil}
203+
# {[B, C, D], 1, 1} -> {[C], B, D}
204+
205+
new_temporal_unit? = depayloader.current_timestamp != packet.timestamp
206+
207+
{depayloader, obus, next_obu_fragment} =
208+
depayloader
209+
|> update_current_obu(current_obu_fragment, new_temporal_unit?)
210+
|> maybe_flush_current_obu(obus, next_obu_fragment, y)
211+
212+
# {[A], 0, 0} -> nil
213+
# {[A], 0, 1} -> nil
214+
# {[A], 1, 0} -> if CO != nil, do: CO <> A, else: nil
215+
# {[A], 1, 1} -> if CO != nil, do: CO <> A, else: nil
216+
#
217+
# {[B, C, D], 0, 0} -> nil
218+
# {[B, C, D], 0, 1} -> nil
219+
# {[B, C, D], 1, 0} -> if CO != nil, do: CO <> B, else: nil
220+
# {[B, C, D], 1, 1} -> if CO != nil, do: CO <> B, else: nil
221+
222+
# {[A], 0, 0} -> {nil, [A]}
223+
# {[A], 0, 1} -> {A, []}
224+
# {[A], 1, _}, cat err -> {nil, []}
225+
# {[A], 1, 0}, cat ok -> {nil, [CO]}
226+
# {[A], 1, 1}, cat ok -> {CO, []}
227+
#
228+
# {[B, C, D], 0, 0} -> {nil, [B, C, D]}
229+
# {[B, C, D], 0, 1} -> {D, [B, C]}
230+
# {[B, C, D], 1, 0}, cat err -> {nil, [C, D]}
231+
# {[B, C, D], 1, 1}, cat err -> {D, [C]}
232+
# {[B, C, D], 1, 0}, cat ok -> {nil, [CO, C, D]}
233+
# {[B, C, D], 1, 1}, cat ok -> {D, [CO, C]}
234+
235+
# XXX what to do if we're ready to flush two?
236+
# depayloader
237+
# |> update_temporal_unit(obus, next_obu_fragment)
238+
# |> maybe_flush_temporal_unit(packet)
239+
240+
if new_temporal_unit? do
241+
{temporal_unit, depayloader} = flush_temporal_unit(depayloader)
242+
243+
{temporal_unit,
244+
update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
245+
else
246+
{nil, update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
247+
end
248+
end
249+
250+
defp append_obus([], tu), do: tu
251+
252+
defp append_obus([obu_binary | rest], tu) do
253+
case OBU.parse(obu_binary) do
254+
{:ok, obu, rest_of_binary} ->
255+
if rest_of_binary != <<>>, do: Logger.debug("WRITEME")
256+
257+
append_obus(rest, [obu | tu])
258+
259+
{:error, :invalid_av1_bitstream} ->
260+
Logger.debug("WRITEME")
261+
262+
append_obus(rest, tu)
263+
end
264+
end
265+
end

lib/ex_webrtc/rtp/av1/obu.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,13 @@ defmodule ExWebRTC.RTP.AV1.OBU do
7272

7373
defp parse_extension_header(_, _), do: {:error, :invalid_av1_bitstream}
7474

75-
defp parse_payload(0, rest), do: {:ok, rest, <<>>}
75+
# This can be reused to parse OBU elements from RTP packet payloads
76+
@spec parse_payload(0 | 1, binary()) ::
77+
{:ok, binary(), binary()} | {:error, :invalid_av1_bitstream}
78+
def parse_payload(s, payload)
79+
def parse_payload(0, rest), do: {:ok, rest, <<>>}
7680

77-
defp parse_payload(1, rest) do
81+
def parse_payload(1, rest) do
7882
with {:ok, leb128_size, payload_size} <- LEB128.read(rest),
7983
<<_::binary-size(leb128_size), payload::binary-size(payload_size), rest::binary>> <- rest do
8084
{:ok, payload, rest}

lib/ex_webrtc/rtp/av1/payload.ex

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,31 @@ defmodule ExWebRTC.RTP.AV1.Payload do
44
#
55
# Based on [RTP Payload Format for AV1](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html).
66
#
7-
# RTP payload syntax:
7+
# RTP payload syntax (OBU element = OBU, or a fragment of an OBU):
88
# 0 1 2 3 4 5 6 7
99
# +-+-+-+-+-+-+-+-+
1010
# |Z|Y| W |N|-|-|-| (REQUIRED)
1111
# +=+=+=+=+=+=+=+=+ (REPEATED W-1 times, or any times if W = 0)
1212
# |1| |
13-
# +-+ OBU fragment|
13+
# +-+ OBU element |
1414
# |1| | (REQUIRED, leb128 encoded)
1515
# +-+ size |
1616
# |0| |
1717
# +-+-+-+-+-+-+-+-+
18-
# | OBU fragment |
18+
# | OBU element |
1919
# | ... |
2020
# +=+=+=+=+=+=+=+=+
2121
# | ... |
22-
# +=+=+=+=+=+=+=+=+ if W > 0, last fragment MUST NOT have size field
23-
# | OBU fragment |
22+
# +=+=+=+=+=+=+=+=+ if W > 0, last element MUST NOT have size field
23+
# | OBU element |
2424
# | ... |
2525
# +=+=+=+=+=+=+=+=+
2626

27+
require Logger
28+
29+
alias ExWebRTC.RTP.AV1.OBU
30+
alias ExWebRTC.Utils
31+
2732
@type t :: %__MODULE__{
2833
z: 0 | 1,
2934
y: 0 | 1,
@@ -93,4 +98,38 @@ defmodule ExWebRTC.RTP.AV1.Payload do
9398

9499
[first_obu_payload | next_obu_payloads]
95100
end
101+
102+
@doc """
103+
Depayloads OBU elements from the RTP AV1 packet payload.
104+
"""
105+
@spec depayload_obu_elements(t()) :: [binary()]
106+
def depayload_obu_elements(%__MODULE__{w: w, payload: payload}) do
107+
element_count = if w > 0, do: w, else: -1
108+
parse_obu_elements(payload, element_count)
109+
end
110+
111+
defp parse_obu_elements(data, count, obu_elements \\ [])
112+
113+
defp parse_obu_elements(<<>>, count, obu_elements) do
114+
if count > 0,
115+
do: Logger.warning("Invalid AV1 RTP payload: expected #{count} more OBU elements.")
116+
117+
Enum.reverse(obu_elements)
118+
end
119+
120+
defp parse_obu_elements(data, count, obu_elements) do
121+
s = Utils.to_int(count != 1)
122+
123+
case OBU.parse_payload(s, data) do
124+
{:ok, obu_element, rest} ->
125+
parse_obu_elements(rest, count - 1, [obu_element | obu_elements])
126+
127+
{:error, :invalid_av1_bitstream} ->
128+
Logger.warning("""
129+
Unable to parse OBU element from invalid AV1 bitstream. Dropping rest of AV1 RTP payload.\
130+
""")
131+
132+
parse_obu_elements(<<>>, count, obu_elements)
133+
end
134+
end
96135
end

lib/ex_webrtc/rtp/av1/payloader.ex

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do
1010

1111
@behaviour ExWebRTC.RTP.Payloader.Behaviour
1212

13+
require Logger
14+
1315
alias ExWebRTC.RTP.AV1.{OBU, Payload}
1416
alias ExWebRTC.Utils
1517

@@ -41,7 +43,12 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do
4143
next_obus
4244

4345
_ ->
44-
raise "Invalid AV1 temporal unit: does not start with temporal delimiter OBU"
46+
Logger.warning("""
47+
Invalid AV1 temporal unit: does not start with temporal delimiter OBU. \
48+
Dropping temporal unit.\
49+
""")
50+
51+
[]
4552
end
4653

4754
# With the current implementation, each RTP packet will contain one OBU element.
@@ -72,7 +79,11 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do
7279
parse_obus(rest, [obu | obus])
7380

7481
{:error, :invalid_av1_bitstream} ->
75-
raise "Invalid AV1 bitstream: unable to parse OBU"
82+
Logger.warning("""
83+
Unable to parse OBU from invalid AV1 bitstream. Dropping rest of temporal unit.\
84+
""")
85+
86+
parse_obus(<<>>, obus)
7687
end
7788
end
7889
end

0 commit comments

Comments
 (0)