diff --git a/sendrecv/gst/webrtc-sendrecv.py b/sendrecv/gst/webrtc-sendrecv.py index 083788d..e4c514b 100644 --- a/sendrecv/gst/webrtc-sendrecv.py +++ b/sendrecv/gst/webrtc-sendrecv.py @@ -28,6 +28,7 @@ def __init__(self, id_, peer_id, server): self.id_ = id_ self.conn = None self.pipe = None + self.bus = None self.webrtc = None self.peer_id = peer_id self.server = server or 'wss://webrtc.nirbheek.in:8443' @@ -105,6 +106,25 @@ def on_incoming_stream(self, _, pad): decodebin.sync_state_with_parent() self.webrtc.link(decodebin) + def poll_cb(self): + done = False + while self.bus.peek() != None: + msg = self.bus.pop() + + if msg.type == Gst.MessageType.ERROR: + err = msg.parse_error() + print ("ERROR!!") + print (err.gerror, err.debug) + done = True + elif msg == Gst.MessageType.EOS: + done = True + + if done: + pollfd = self.bus.get_pollfd() + asyncio.get_event_loop().remove_reader(pollfd.fd) + asyncio.ensure_future(self.conn.close()) + break + def start_pipeline(self): self.pipe = Gst.parse_launch(PIPELINE_DESC) self.webrtc = self.pipe.get_by_name('sendrecv') @@ -113,7 +133,11 @@ def start_pipeline(self): self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) - async def handle_sdp(self, message): + self.bus = self.pipe.get_bus() + pollfd = self.bus.get_pollfd() + asyncio.get_event_loop().add_reader(pollfd.fd, self.poll_cb) + + async def handle_json(self, message): assert (self.webrtc) msg = json.loads(message) if 'sdp' in msg: @@ -144,7 +168,7 @@ async def loop(self): print (message) return 1 else: - await self.handle_sdp(message) + await self.handle_json(message) return 0