diff --git a/lib/membrane/rtp/tcp_depacketizer.ex b/lib/membrane/rtp/tcp_depacketizer.ex new file mode 100644 index 00000000..4c638351 --- /dev/null +++ b/lib/membrane/rtp/tcp_depacketizer.ex @@ -0,0 +1,51 @@ +defmodule Membrane.RTP.TCP.Depacketizer do + @moduledoc """ + This element provides functionality of serializing RTP and RTCP packets into a bytestream + that can be send over TCP connection. The encapsulation is described in RFC 4571. + + Packets in the stream will have the following structure: + [Length :: 2 bytes][packet :: bytes] + """ + use Membrane.Filter + + require Logger + alias Membrane.{Buffer, RemoteStream, RTP} + + require Membrane.Logger + + def_input_pad :input, accepted_format: %RemoteStream{type: :packetized, content_format: RTP} + + def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream} + + @impl true + def handle_init(_ctx, _opts) do + {[], %{}} + end + + @impl true + def handle_playing(_ctx, state) do + stream_format = %RemoteStream{type: :bytestream} + {[stream_format: {:output, stream_format}], state} + end + + @impl true + def handle_stream_format(:input, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do + len_bytes = + case :binary.encode_unsigned(byte_size(payload), :big) do + <> -> <<0, len>> + <> -> len + end + + buffer = %Buffer{ + payload: len_bytes <> payload, + metadata: metadata + } + + {[buffer: {:output, [buffer]}], state} + end +end diff --git a/lib/membrane/rtp/tcp_packetizer.ex b/lib/membrane/rtp/tcp_packetizer.ex new file mode 100644 index 00000000..1248b849 --- /dev/null +++ b/lib/membrane/rtp/tcp_packetizer.ex @@ -0,0 +1,66 @@ +defmodule Membrane.RTP.TCP.Packetizer do + @moduledoc """ + This element provides functionality of packetizing bytestream from TCP + into RTP and RTCP Packets. The encapsulation is described in RFC 4571. + + Packets in the stream will have the following structure: + [Length :: 2 bytes][packet :: bytes] + """ + use Membrane.Filter + + require Logger + alias Membrane.{Buffer, RemoteStream, RTP} + require Membrane.Logger + + def_input_pad :input, accepted_format: %RemoteStream{type: :bytestream} + + def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP} + + @impl true + def handle_init(_ctx, _opts) do + {[], %{unprocessed_data: <<>>}} + end + + @impl true + def handle_playing(_ctx, state) do + stream_format = %RemoteStream{type: :packetized, content_format: RTP} + {[stream_format: {:output, stream_format}], state} + end + + @impl true + def handle_stream_format(:input, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do + packets_binary = state.unprocessed_data <> payload + + {unprocessed_data, complete_packets_binaries} = get_complete_packets(packets_binary) + + packets_buffers = + Enum.map(complete_packets_binaries, &%Buffer{payload: &1, metadata: metadata}) + + {[buffer: {:output, packets_buffers}], %{state | unprocessed_data: unprocessed_data}} + end + + @spec get_complete_packets(binary()) :: + {unprocessed_data :: binary(), complete_packets :: [binary()]} + defp get_complete_packets(packets_binary, complete_packets \\ []) + + defp get_complete_packets(packets_binary, complete_packets) + when byte_size(packets_binary) <= 2 do + {packets_binary, Enum.reverse(complete_packets)} + end + + defp get_complete_packets(packets_binary, complete_packets) do + <> = packets_binary + + if payload_length > byte_size(rest) do + {packets_binary, Enum.reverse(complete_packets)} + else + <> = rest + get_complete_packets(rest, [complete_packet_binary | complete_packets]) + end + end +end