Skip to content

Commit

Permalink
SourceBin dynamic pads
Browse files Browse the repository at this point in the history
  • Loading branch information
bartkrak committed Jul 3, 2024
1 parent 89309aa commit 4b6cb9a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 20 deletions.
113 changes: 93 additions & 20 deletions lib/membrane_rtmp_plugin/rtmp/source/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Membrane.RTMP.SourceBin do

def_output_pad :video,
accepted_format: H264,
availability: :always
availability: :on_request

def_output_pad :audio,
accepted_format: AAC,
availability: :always
availability: :on_request

def_options socket: [
spec: :gen_tcp.socket() | :ssl.sslsocket(),
Expand Down Expand Up @@ -57,34 +57,101 @@ defmodule Membrane.RTMP.SourceBin do

@impl true
def handle_init(_ctx, %__MODULE__{} = opts) do
structure = [
spec = [
child(:src, %RTMP.Source{
socket: opts.socket,
validator: opts.validator,
use_ssl?: opts.use_ssl?
})
|> child(:demuxer, Membrane.FLV.Demuxer),
#
child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none
}),
child(:video_parser, Membrane.H264.Parser),
#
get_child(:demuxer)
|> via_out(Pad.ref(:audio, 0))
|> get_child(:audio_parser)
|> bin_output(:audio),
#
get_child(:demuxer)
|> via_out(Pad.ref(:video, 0))
|> get_child(:video_parser)
|> bin_output(:video)
|> child(:demuxer, Membrane.FLV.Demuxer)
]

{[spec: structure], %{}}
state = %{
demuxer_audio_pad_ref: nil,
demuxer_video_pad_ref: nil
}

{[spec: spec], state}
end

@impl true
def handle_pad_added(Pad.ref(:audio, _ref) = pad, _ctx, state) do
spec =
if state.demuxer_audio_pad_ref != nil do
[
get_child(:demuxer)
|> via_out(state.demuxer_audio_pad_ref)
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none
})
|> bin_output(pad)
]
else
[
child(:funnel_audio, Membrane.Funnel)
|> bin_output(pad)
]
end

{[spec: spec], state}
end

def handle_pad_added(Pad.ref(:video, _ref) = pad, _ctx, state) do
spec =
if state.demuxer_video_pad_ref != nil do
[
get_child(:demuxer)
|> via_out(state.demuxer_video_pad_ref)
|> child(:video_parser, Membrane.H264.Parser)
|> bin_output(pad)
]
else
[
child(:funnel_video, Membrane.Funnel)
|> bin_output(pad)
]
end

{[spec: spec], state}
end

@impl true
def handle_child_notification({:new_stream, pad_ref, :AAC}, :demuxer, ctx, state) do
audio_pad_ref = get_pad(:audio, ctx)

if audio_pad_ref != nil do
{[
spec: [
get_child(:demuxer)
|> via_out(pad_ref)
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none
})
|> get_child(:funnel_audio)
]
], state}
else
{[], %{state | demuxer_audio_pad_ref: pad_ref}}
end
end

def handle_child_notification({:new_stream, pad_ref, :H264}, :demuxer, ctx, state) do
video_pad_ref = get_pad(:video, ctx)

if video_pad_ref != nil do
{[
spec: [
get_child(:demuxer)
|> via_out(pad_ref)
|> child(:video_parser, Membrane.H264.Parser)
|> get_child(:funnel_video)
]
], state}
else
{[], %{state | demuxer_video_pad_ref: pad_ref}}
end
end

def handle_child_notification(
{type, _socket, _pid} = notification,
:src,
Expand Down Expand Up @@ -128,4 +195,10 @@ defmodule Membrane.RTMP.SourceBin do
def secure_pass_control(socket, source) do
:ssl.controlling_process(socket, source)
end

defp get_pad(name, ctx) do
ctx.pads
|> Map.keys()
|> Enum.find(fn pad_ref -> Pad.name_by_ref(pad_ref) == name end)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ defmodule Membrane.RTMP.Mixfile do
{:membrane_aac_plugin, "~> 0.18.1"},
{:membrane_flv_plugin, "~> 0.12.0"},
{:membrane_file_plugin, "~> 0.17.0"},
{:membrane_funnel_plugin, "~> 0.9.0"},
# testing
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},
{:ffmpex, "~> 0.10.0", only: :test},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"},
"membrane_core": {:hex, :membrane_core, "1.0.1", "08aa546c0d131c66f8b906b3dfb2b8f2749b56859f6fc52bd3ac846b944b3baa", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a35ed68561bdf0a2dbb2f994333be78cf4e1c4d734e4cd927d77d92049bb1273"},
"membrane_fake_plugin": {:hex, :membrane_fake_plugin, "0.11.0", "3a2d26f15ad4940a4d44cee3354dff38fa9a39963e9b2dcb49802e150ff9a9dc", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c6b6a06eaa4e820d1e4836510ddb4bcb386c8918d0b37542a21caf6b87cbe72"},
"membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.0", "e855a848e84eaed537b41fd4436712038fc5518059eadc8609c83cd2d819653a", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "9c3653ca9f13bb409b36257d6094798d4625c739ab7a4035c12308622eb16e0b"},
"membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.12.0", "d715ad405af86dcaf4b2f479e34088e1f6738c7280366828e1066b39d2aa493a", [:mix], [{:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "a317872d6d394e550c7bfd8979f12a3a1cc1e89b547d75360321025b403d3279"},
"membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.0", "9cfe09e44d65751f7d9d8d3c42e14797f7be69e793ac112ea63cd224af70a7bf", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "988790aca59d453a6115109f050699f7f45a2eb6a7f8dc5c96392760cddead54"},
"membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"},
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.0", "0a7c6b9a7678e8c111b22b5417465ac31cf6e598cff6a53ab53a9c379bdfa1ef", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "e9cde8c8995ace9fc26355037cbcc780f1727a3f63d36c21b52232fd29d0ad40"},
Expand Down

0 comments on commit 4b6cb9a

Please sign in to comment.