Skip to content

Commit

Permalink
Merge pull request #12 from membraneframework-labs/use_new_rtmp_api
Browse files Browse the repository at this point in the history
Use new rtmp api
  • Loading branch information
bartkrak authored Jul 31, 2024
2 parents 9b28379 + 64d9955 commit 13870e1
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 74 deletions.
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ workflows:
build:
jobs:
- elixir/build_test:
cache-version: 3
filters: &filters
tags:
only: /v.*/
- elixir/test:
cache-version: 3
filters:
<<: *filters
- elixir/lint:
cache-version: 3
filters:
<<: *filters
- elixir/hex_publish:
cache-version: 3
requires:
- elixir/build_test
- elixir/test
Expand Down
25 changes: 5 additions & 20 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ defmodule Boombox.Pipeline do
spec_builder: [],
track_builders: nil,
last_result: nil,
eos_info: nil,
rtmp_input_state: nil
eos_info: nil
]

@typedoc """
Expand Down Expand Up @@ -87,8 +86,7 @@ defmodule Boombox.Pipeline do
spec_builder: Membrane.ChildrenSpec.t(),
track_builders: Boombox.Pipeline.track_builders() | nil,
last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil,
eos_info: term(),
rtmp_input_state: Boombox.RTMP.state()
eos_info: term()
}
end

Expand All @@ -115,17 +113,6 @@ defmodule Boombox.Pipeline do
|> proceed_result(ctx, state)
end

@impl true
def handle_child_notification(
{:socket_control_needed, _socket, source_pid},
:rtmp_source,
ctx,
state
) do
Boombox.RTMP.handle_socket_control(source_pid, state.rtmp_input_state)
|> proceed_result(ctx, state)
end

@impl true
def handle_child_notification({:new_tracks, tracks}, :webrtc_output, ctx, state) do
%{status: :awaiting_output_link} = state
Expand Down Expand Up @@ -158,11 +145,9 @@ defmodule Boombox.Pipeline do
end

@impl true
def handle_info({:rtmp_tcp_server, server_pid, socket}, ctx, state) do
{result, rtmp_input_state} =
Boombox.RTMP.handle_connection(server_pid, socket, state.rtmp_input_state)

proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state})
def handle_info({:rtmp_client_ref, client_ref}, ctx, state) do
Boombox.RTMP.handle_connection(client_ref)
|> proceed_result(ctx, state)
end

@impl true
Expand Down
56 changes: 23 additions & 33 deletions lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,45 @@ defmodule Boombox.RTMP do
@moduledoc false

import Membrane.ChildrenSpec
require Membrane.Logger
alias Boombox.Pipeline.{Ready, Wait}
alias Membrane.RTMP

@type state :: %{server_pid: pid()} | nil

@spec create_input(URI.t(), pid()) :: Wait.t()
@spec create_input(String.t(), pid()) :: Wait.t()
def create_input(uri, utility_supervisor) do
uri = URI.new!(uri)
{:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet)
{use_ssl?, port, target_app, target_stream_key} = RTMP.Utils.parse_url(uri)

boombox = self()

server_options = %Membrane.RTMP.Source.TcpServer{
port: uri.port,
listen_options: [:binary, packet: :raw, active: false, ip: ip],
socket_handler: fn socket ->
send(boombox, {:rtmp_tcp_server, self(), socket})

receive do
{:rtmp_source_pid, pid} -> {:ok, pid}
:rtmp_already_connected -> {:error, :rtmp_already_connected}
end
new_client_callback = fn client_ref, app, stream_key ->
if app == target_app and stream_key == target_stream_key do
send(boombox, {:rtmp_client_ref, client_ref})
else
Membrane.Logger.warning("Unexpected client connected on /#{app}/#{stream_key}")
end
end

server_options = %{
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
client_timeout: 1_000
}

{:ok, _pid} =
{:ok, _server} =
Membrane.UtilitySupervisor.start_link_child(
utility_supervisor,
{Membrane.RTMP.Source.TcpServer, server_options}
{Membrane.RTMP.Server, server_options}
)

%Wait{}
end

@spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) ::
{Ready.t(), state()}
def handle_connection(server_pid, socket, nil = _state) do
@spec handle_connection(pid()) :: Ready.t()
def handle_connection(client_ref) do
spec = [
child(:rtmp_source, %Membrane.RTMP.SourceBin{socket: socket})
child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref})
|> via_out(:audio)
|> child(Membrane.AAC.Parser)
|> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
Expand All @@ -50,17 +51,6 @@ defmodule Boombox.RTMP do
video: get_child(:rtmp_source) |> via_out(:video)
}

{%Ready{spec_builder: spec, track_builders: track_builders}, %{server_pid: server_pid}}
end

def handle_connection(_server_pid, _socket, state) do
send(state.server_pid, :rtmp_already_connected)
{%Wait{}, state}
end

@spec handle_socket_control(pid(), state()) :: Wait.t()
def handle_socket_control(source_pid, state) do
send(state.server_pid, {:rtmp_source_pid, source_pid})
%Wait{}
%Ready{spec_builder: spec, track_builders: track_builders}
end
end
4 changes: 3 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ defmodule Boombox.Mixfile do
{:membrane_mp4_plugin,
github: "membraneframework/membrane_mp4_plugin", branch: "isom-avc3"},
{:membrane_realtimer_plugin, ">= 0.0.0"},
{:membrane_rtmp_plugin, ">= 0.0.0"},
# {:membrane_rtmp_plugin, ">= 0.0.0"},
{:membrane_rtmp_plugin,
github: "membraneframework/membrane_rtmp_plugin", branch: "handle_new_client"},
{:membrane_ffmpeg_swresample_plugin, ">= 0.0.0"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
Loading

0 comments on commit 13870e1

Please sign in to comment.