From 666d5b1f2b959df49590ff2578b09599c1cd88da Mon Sep 17 00:00:00 2001 From: notedit Date: Tue, 26 Feb 2019 13:03:39 +0800 Subject: [PATCH] refactor mediaframe listener Former-commit-id: 02b8b52b3a774d50cb682d72f811af8221a9c7cc [formerly 703891639a4a379cc2e1a9ac626df9926f747b77] [formerly d4ce2ce69a1f7c0ee80a288d59a379779d1f54a7 [formerly eed4f7572aa7dd86ded8429963aa99ad97cd1157]] Former-commit-id: 37d87f97f7b5bb7f32987f83c6fd00fc5f430d44 [formerly a97a6bfd101a040aca2f9e1b137fca940ff542da] Former-commit-id: 06ae356347d0a403df12393ba2765ec0235a016c --- incomingstreamtrack.go | 35 +++++++++++++++++++++++++---------- mediastreamduplicater.go | 28 ++++++++++++++++------------ 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/incomingstreamtrack.go b/incomingstreamtrack.go index d5d91e6..76b1034 100644 --- a/incomingstreamtrack.go +++ b/incomingstreamtrack.go @@ -55,16 +55,17 @@ type IncomingTrackStopListener func() // IncomingStreamTrack Audio or Video track of a remote media stream type IncomingStreamTrack struct { - id string - media string - receiver native.RTPReceiverFacade - counter int - encodings []*Encoding - trackInfo *sdp.TrackInfo - stats map[string]*IncomingAllStats - onStopListeners []func() - onAttachedListeners []func() - onDetachedListeners []func() + id string + media string + receiver native.RTPReceiverFacade + counter int + encodings []*Encoding + trackInfo *sdp.TrackInfo + stats map[string]*IncomingAllStats + mediaStreamDuplicater *MediaStreamDuplicater + onStopListeners []func() + onAttachedListeners []func() + onDetachedListeners []func() } // IncomingStats info @@ -482,6 +483,15 @@ func (i *IncomingStreamTrack) OnStop(stop func()) { i.onStopListeners = append(i.onStopListeners, stop) } +func (i *IncomingStreamTrack) OnMediaFrame(listener func([]byte, uint)) { + + if i.mediaStreamDuplicater == nil { + i.mediaStreamDuplicater = NewMediaStreamDuplicater(i) + } + + i.mediaStreamDuplicater.SetMediaFrameListener(listener) +} + // Stop Removes the track from the incoming stream and also detaches any attached outgoing track or recorder func (i *IncomingStreamTrack) Stop() { @@ -499,6 +509,11 @@ func (i *IncomingStreamTrack) Stop() { } } + if i.mediaStreamDuplicater != nil { + i.mediaStreamDuplicater.Stop() + i.mediaStreamDuplicater = nil + } + for _, stopFunc := range i.onStopListeners { stopFunc() } diff --git a/mediastreamduplicater.go b/mediastreamduplicater.go index 6d12ef5..5bad62b 100644 --- a/mediastreamduplicater.go +++ b/mediastreamduplicater.go @@ -2,6 +2,7 @@ package mediaserver import "C" import ( + "fmt" "unsafe" native "github.com/notedit/media-server-go/wrapper" @@ -9,10 +10,11 @@ import ( // MediaStreamDuplicater we can make a copy of the incoming stream and callback the mediaframe data type MediaStreamDuplicater struct { - MediaFrames chan []byte - track *IncomingStreamTrack - duplicater native.MediaStreamDuplicaterFacade - listener mediaframeListener + track *IncomingStreamTrack + duplicater native.MediaStreamDuplicaterFacade + listener mediaframeListener // used for native wrapper, see swig's doc + + mediaframeListener func([]byte, uint) // used for outside } type mediaframeListener interface { @@ -35,15 +37,17 @@ type overwrittenMediaFrameListener struct { func (p *overwrittenMediaFrameListener) OnMediaFrame(frame native.MediaFrame) { - if p.duplicater != nil { + if p.duplicater != nil && p.duplicater.mediaframeListener != nil { buffer := C.GoBytes(unsafe.Pointer(frame.GetData()), C.int(frame.GetLength())) if frame.GetType() == native.MediaFrameVideo { data, err := annexbConvert(buffer) if err == nil { - p.duplicater.MediaFrames <- data + p.duplicater.mediaframeListener(data, frame.GetTimeStamp()) + } else { + fmt.Println(err) } } else { - p.duplicater.MediaFrames <- buffer + p.duplicater.mediaframeListener(buffer, frame.GetTimeStamp()) } } @@ -59,10 +63,6 @@ func NewMediaStreamDuplicater(track *IncomingStreamTrack) *MediaStreamDuplicater source := track.GetFirstEncoding().GetSource() duplicater.duplicater = native.NewMediaStreamDuplicaterFacade(source) - track.OnStop(func() { - duplicater.Stop() - }) - listener := &overwrittenMediaFrameListener{ duplicater: duplicater, } @@ -73,10 +73,14 @@ func NewMediaStreamDuplicater(track *IncomingStreamTrack) *MediaStreamDuplicater duplicater.duplicater.AddMediaListener(duplicater.listener) - duplicater.MediaFrames = make(chan []byte, 5) return duplicater } +// SetMediaFrameListener set outside mediaframe listener +func (d *MediaStreamDuplicater) SetMediaFrameListener(listener func([]byte, uint)) { + d.mediaframeListener = listener +} + // Stop stop this func (d *MediaStreamDuplicater) Stop() {