Skip to content

Commit

Permalink
Implement elements for packetizing and depacketzing RTP packets into …
Browse files Browse the repository at this point in the history
…connection-oriented transport
  • Loading branch information
wkozyra95 committed Feb 22, 2024
1 parent a70eee2 commit 8117047
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
51 changes: 51 additions & 0 deletions lib/membrane/rtp/tcp_depacketizer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Membrane.RTP.TCP.Depacketizer do
@moduledoc """
This element provides functionality of serializing RTP and RTCP packets into a bytestream
that can be send over TCP connection. The encapsulation is described in RFC 4571.
Packets in the stream will have the following structure:
[Length :: 2 bytes][packet :: <Length> bytes]
"""
use Membrane.Filter

require Logger
alias Membrane.{Buffer, RemoteStream, RTP}

require Membrane.Logger

def_input_pad :input, accepted_format: %RemoteStream{type: :packetized, content_format: RTP}

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}

@impl true
def handle_init(_ctx, _opts) do
{[], %{}}
end

@impl true
def handle_playing(_ctx, state) do
stream_format = %RemoteStream{type: :bytestream}
{[stream_format: {:output, stream_format}], state}
end

@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
{[], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do
len_bytes =
case :binary.encode_unsigned(byte_size(payload), :big) do
<<len::size(8)>> -> <<0, len>>
<<len::binary-size(2)>> -> len
end

buffer = %Buffer{
payload: len_bytes <> payload,
metadata: metadata
}

{[buffer: {:output, [buffer]}], state}
end
end
66 changes: 66 additions & 0 deletions lib/membrane/rtp/tcp_packetizer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Membrane.RTP.TCP.Packetizer do
@moduledoc """
This element provides functionality of packetizing bytestream from TCP
into RTP and RTCP Packets. The encapsulation is described in RFC 4571.
Packets in the stream will have the following structure:
[Length :: 2 bytes][packet :: <Length> bytes]
"""
use Membrane.Filter

require Logger
alias Membrane.{Buffer, RemoteStream, RTP}
require Membrane.Logger

def_input_pad :input, accepted_format: %RemoteStream{type: :bytestream}

def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP}

@impl true
def handle_init(_ctx, _opts) do
{[], %{unprocessed_data: <<>>}}
end

@impl true
def handle_playing(_ctx, state) do
stream_format = %RemoteStream{type: :packetized, content_format: RTP}
{[stream_format: {:output, stream_format}], state}
end

@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
{[], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do
packets_binary = state.unprocessed_data <> payload

{unprocessed_data, complete_packets_binaries} = get_complete_packets(packets_binary)

packets_buffers =
Enum.map(complete_packets_binaries, &%Buffer{payload: &1, metadata: metadata})

{[buffer: {:output, packets_buffers}], %{state | unprocessed_data: unprocessed_data}}
end

@spec get_complete_packets(binary()) ::
{unprocessed_data :: binary(), complete_packets :: [binary()]}
defp get_complete_packets(packets_binary, complete_packets \\ [])

defp get_complete_packets(packets_binary, complete_packets)
when byte_size(packets_binary) <= 2 do
{packets_binary, Enum.reverse(complete_packets)}
end

defp get_complete_packets(packets_binary, complete_packets) do
<<payload_length::size(16), rest::binary>> = packets_binary

if payload_length > byte_size(rest) do
{packets_binary, Enum.reverse(complete_packets)}
else
<<complete_packet_binary::binary-size(payload_length)-unit(8), rest::binary>> = rest
get_complete_packets(rest, [complete_packet_binary | complete_packets])
end
end
end

0 comments on commit 8117047

Please sign in to comment.