Skip to content

Commit

Permalink
Relay RTMP messages to pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
lastcanal committed Oct 28, 2024
1 parent 39f8186 commit a277687
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 15 deletions.
18 changes: 13 additions & 5 deletions lib/membrane_rtmp_plugin/rtmp/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,23 @@ defmodule Membrane.RTMP.MessageHandler do
end

# A message containing stream metadata
defp do_handle_client_message(%Messages.SetDataFrame{} = _data_frame, _header, state) do
defp do_handle_client_message(%Messages.SetDataFrame{} = set_data_frame, _header, state) do
if pid = state.receiver_pid, do: send(pid, set_data_frame)
{:cont, state}
end

defp do_handle_client_message(%Messages.OnMetaData{} = _on_meta_data, _header, state) do
defp do_handle_client_message(%Messages.OnMetaData{} = on_meta_data, _header, state) do
if pid = state.receiver_pid, do: send(pid, on_meta_data)
{:cont, state}
end

# According to ffmpeg's documentation, this command should prepare the server to receive media streams
# We are simply acknowledging the message
defp do_handle_client_message(%Messages.FCPublish{}, _header, state) do
defp do_handle_client_message(%Messages.FCPublish{} = fc_publish, _header, state) do
%Messages.Anonymous{name: "onFCPublish", properties: []}
|> send_rtmp_payload(state.socket, chunk_stream_id: 3)

if pid = state.receiver_pid, do: send(pid, fc_publish)
{:cont, state}
end

Expand All @@ -186,14 +189,16 @@ defmodule Membrane.RTMP.MessageHandler do
|> Responses.default_result([:null, stream_id])
|> send_rtmp_payload(state.socket, chunk_stream_id: 3)

if pid = state.receiver_pid, do: send(pid, create_stream)
{:cont, state}
end

# we ignore acknowledgement messages, but they're rarely used anyways
defp do_handle_client_message(%module{}, _header, state)
defp do_handle_client_message(%module{} = msg, _header, state)
when module in [Messages.Acknowledgement, Messages.WindowAcknowledgement] do
Logger.debug("#{inspect(module)} received, ignoring as acknowledgements are not implemented")

if pid = state.receiver_pid, do: send(pid, msg)
{:cont, state}
end

Expand All @@ -214,10 +219,12 @@ defmodule Membrane.RTMP.MessageHandler do
defp do_handle_client_message(%Messages.UserControl{} = msg, _header, state) do
Logger.warning("Received unsupported user control message of type #{inspect(msg.event_type)}")

if pid = state.receiver_pid, do: send(pid, msg)
{:cont, state}
end

defp do_handle_client_message(%Messages.DeleteStream{}, _header, state) do
defp do_handle_client_message(%Messages.DeleteStream{} = delete_stream, _header, state) do
if pid = state.receiver_pid, do: send(pid, delete_stream)
{:halt, %{state | events: [:delete_stream | state.events]}}
end

Expand All @@ -234,6 +241,7 @@ defmodule Membrane.RTMP.MessageHandler do

defp do_handle_client_message(%Messages.Anonymous{} = message, _header, state) do
Logger.debug("Unknown message: #{inspect(message)}")
if pid = state.receiver_pid, do: send(pid, message)

{:cont, state}
end
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ defmodule Membrane.RTMPServer do
an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty
map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`.
"""
@type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()}

@type receiver_pid :: pid() | nil

@type client_behaviour_spec ::
ClientHandler.t()
| {ClientHandler.t(), opts :: any()}
| {ClientHandler.t(), opts :: any(), receiver_pid}

@type server_identifier :: pid() | atom()

Expand Down
18 changes: 11 additions & 7 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ defmodule Membrane.RTMPServer.ClientHandler do
raise "handle_new_client is not a function"
end

{handler_module, opts} =
{handler_module, opts, pid} =
case state.handle_new_client.(self(), state.app, stream_key) do
{handler_module, opts} -> {handler_module, opts}
handler_module -> {handler_module, %{}}
{handler_module, opts, pid} when is_pid(pid) -> {handler_module, opts, pid}
{handler_module, opts} -> {handler_module, opts, nil}
handler_module -> {handler_module, %{}, nil}
end

Process.send_after(
Expand All @@ -177,10 +178,14 @@ defmodule Membrane.RTMPServer.ClientHandler do
state
| notified_about_client?: true,
handler: handler_module,
handler_state: handler_module.handle_init(opts)
handler_state: handler_module.handle_init(opts),
message_handler_state: %{message_handler_state | receiver_pid: pid}
}
else
state
%{
state
| message_handler_state: message_handler_state
}
end

state = Enum.reduce(events, state, &handle_event/2)
Expand All @@ -190,8 +195,7 @@ defmodule Membrane.RTMPServer.ClientHandler do
{:noreply,
%{
state
| message_parser_state: message_parser_state,
message_handler_state: message_handler_state
| message_parser_state: message_parser_state
}}
end

Expand Down
57 changes: 55 additions & 2 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,53 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
assert :ok = Task.await(ffmpeg_task)
end

test "Messages are sent to receiver_pid" do
self = self()

pipeline_startup_task =
Task.async(fn ->
start_pipeline_with_external_rtmp_server(@app, @stream_key, self, 0, false, self)
end)

port =
receive do
{:port, port} -> port
end

ffmpeg_task =
Task.async(fn ->
"rtmp://localhost:#{port}/#{@app}/#{@stream_key}" |> start_ffmpeg()
end)

pipeline = Task.await(pipeline_startup_task)

assert_receive(%Membrane.RTMP.Messages.SetDataFrame{})

assert_buffers(%{
pipeline: pipeline,
sink: :video_sink,
stream_length: @stream_length_ms,
buffers_expected: div(@stream_length_ms, @video_frame_duration_ms)
})

assert_buffers(%{
pipeline: pipeline,
sink: :audio_sink,
stream_length: @stream_length_ms,
buffers_expected: div(@stream_length_ms, @audio_frame_duration_ms)
})

assert_end_of_stream(pipeline, :audio_sink, :input)
assert_end_of_stream(pipeline, :video_sink, :input)

assert_received(%Membrane.RTMP.Messages.Anonymous{})
assert_received(%Membrane.RTMP.Messages.DeleteStream{})

# Cleanup
Testing.Pipeline.terminate(pipeline)
assert :ok = Task.await(ffmpeg_task)
end

defp start_pipeline_with_builtin_rtmp_server(app, stream_key, use_ssl? \\ false) do
options = [
module: Membrane.RTMP.Source.WithBuiltinServerTestPipeline,
Expand All @@ -216,13 +263,19 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
stream_key,
parent,
port \\ 0,
use_ssl? \\ false
use_ssl? \\ false,
receiver_pid \\ nil
) do
parent_process_pid = self()

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl

if receiver_pid do
{Membrane.RTMP.Source.ClientHandlerImpl, %{}, receiver_pid}
else
Membrane.RTMP.Source.ClientHandlerImpl
end
end

{:ok, server_pid} =
Expand Down

0 comments on commit a277687

Please sign in to comment.