From 17756f8c525e8ebeb963f0ae70e0e9f53c811be4 Mon Sep 17 00:00:00 2001 From: Philip Giuliani Date: Tue, 8 Oct 2024 16:46:10 +0200 Subject: [PATCH] Require passing the packager from the outside --- examples/rtmp.exs | 13 ++++++-- lib/membrane/hls/sink_bin.ex | 50 ++++++++--------------------- test/membrane/hls/sink_bin_test.exs | 15 +++++++-- 3 files changed, 36 insertions(+), 42 deletions(-) diff --git a/examples/rtmp.exs b/examples/rtmp.exs index 0c25fba..d7666e8 100644 --- a/examples/rtmp.exs +++ b/examples/rtmp.exs @@ -15,6 +15,16 @@ defmodule Pipeline do def handle_init(_ctx, _opts) do File.rm_rf("tmp") + {:ok, packager_pid} = + Agent.start_link(fn -> + HLS.Packager.new( + storage: HLS.Storage.File.new(), + manifest_uri: URI.new!("file://tmp/stream.m3u8"), + resume_finished_tracks: true, + restore_pending_segments: false + ) + end) + structure = [ # Source child(:source, %Membrane.RTMP.Source{ @@ -24,9 +34,8 @@ defmodule Pipeline do # Sink child(:sink, %Membrane.HLS.SinkBin{ - manifest_uri: URI.new!("file://tmp/stream.m3u8"), + packager_pid: packager_pid, target_segment_duration: Membrane.Time.seconds(7), - storage: HLS.Storage.File.new() }), # Audio diff --git a/lib/membrane/hls/sink_bin.ex b/lib/membrane/hls/sink_bin.ex index b8cc69e..7226d90 100644 --- a/lib/membrane/hls/sink_bin.ex +++ b/lib/membrane/hls/sink_bin.ex @@ -9,18 +9,10 @@ defmodule Membrane.HLS.SinkBin do require Membrane.Logger def_options( - manifest_uri: [ - spec: URI.t(), + packager_pid: [ + spec: pid(), description: """ - Destination URI of the manifest. - Example: file://output/stream.m3u8 - """ - ], - storage: [ - spec: HLS.Storage, - required: true, - description: """ - Implementation of the storage. + PID of a `HLS.Packager` which must be wrapped in an Agent (for now). """ ], target_segment_duration: [ @@ -79,27 +71,11 @@ defmodule Membrane.HLS.SinkBin do %{ opts: opts, flush: opts.flush_on_end, - packager_pid: nil, ended_sinks: MapSet.new(), live_state: nil }} end - @impl true - def handle_setup(_context, state) do - {:ok, packager_pid} = - Agent.start_link(fn -> - Packager.new( - storage: state.opts.storage, - manifest_uri: state.opts.manifest_uri, - resume_finished_tracks: true, - restore_pending_segments: false - ) - end) - - {[], %{state | packager_pid: packager_pid}} - end - @impl true def handle_element_start_of_stream( child = {:muxer, _}, @@ -128,7 +104,7 @@ defmodule Membrane.HLS.SinkBin do %{pad_options: %{encoding: :AAC} = pad_opts}, state ) do - {_max_pts, track_pts} = resume_info(state.packager_pid, track_id) + {_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id) spec = bin_input(pad) @@ -139,7 +115,7 @@ defmodule Membrane.HLS.SinkBin do }) |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ - packager_pid: state.packager_pid, + packager_pid: state.opts.packager_pid, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -154,7 +130,7 @@ defmodule Membrane.HLS.SinkBin do %{pad_options: %{encoding: :H264} = pad_opts}, state ) do - {_max_pts, track_pts} = resume_info(state.packager_pid, track_id) + {_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id) spec = bin_input(pad) @@ -163,7 +139,7 @@ defmodule Membrane.HLS.SinkBin do segment_min_duration: pad_opts.segment_duration }) |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ - packager_pid: state.packager_pid, + packager_pid: state.opts.packager_pid, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -177,7 +153,7 @@ defmodule Membrane.HLS.SinkBin do %{pad_options: %{encoding: :TEXT} = pad_opts}, state ) do - {_max_pts, track_pts} = resume_info(state.packager_pid, track_id) + {_max_pts, track_pts} = resume_info(state.opts.packager_pid, track_id) spec = bin_input(pad) @@ -193,7 +169,7 @@ defmodule Membrane.HLS.SinkBin do ] }) |> child({:sink, track_id}, %Membrane.HLS.WebVTTSink{ - packager_pid: state.packager_pid, + packager_pid: state.opts.packager_pid, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -212,7 +188,7 @@ defmodule Membrane.HLS.SinkBin do |> put_in([:live_state], %{stop: true}) |> put_in([:ended_sinks], ended_sinks) - if state.flush, do: Agent.update(state.packager_pid, &Packager.flush/1, :infinity) + if state.flush, do: Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity) {[notify_parent: :end_of_stream], state} else @@ -227,7 +203,7 @@ defmodule Membrane.HLS.SinkBin do @impl true def handle_parent_notification(:flush, ctx, state) do if not state.flush and all_streams_ended?(ctx, state.ended_sinks) do - Agent.update(state.packager_pid, &Packager.flush/1, :infinity) + Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity) {[notify_parent: :end_of_stream], %{state | flush: true}} else {[], %{state | flush: true}} @@ -245,7 +221,7 @@ defmodule Membrane.HLS.SinkBin do ) Agent.update( - state.packager_pid, + state.opts.packager_pid, fn p -> Packager.sync(p, state.live_state.next_sync_point) end, @@ -305,7 +281,7 @@ defmodule Membrane.HLS.SinkBin do # Tells where in the playlist we should start issuing segments. next_sync_point = Agent.get( - state.packager_pid, + state.opts.packager_pid, &Packager.next_sync_point( &1, Membrane.Time.as_seconds(state.opts.target_segment_duration, :round) diff --git a/test/membrane/hls/sink_bin_test.exs b/test/membrane/hls/sink_bin_test.exs index d50b686..93e05b8 100644 --- a/test/membrane/hls/sink_bin_test.exs +++ b/test/membrane/hls/sink_bin_test.exs @@ -6,11 +6,20 @@ defmodule Membrane.HLS.SinkBinTest do @tag :tmp_dir test "on a new stream", %{tmp_dir: tmp_dir} do + {:ok, packager_pid} = + Agent.start_link(fn -> + HLS.Packager.new( + manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), + storage: HLS.Storage.File.new(), + resume_finished_tracks: true, + restore_pending_segments: false + ) + end) + spec = [ child(:sink, %Membrane.HLS.SinkBin{ - manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), - target_segment_duration: Membrane.Time.seconds(7), - storage: HLS.Storage.File.new() + packager_pid: packager_pid, + target_segment_duration: Membrane.Time.seconds(7) }), # Source