Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

sendrecv.py: poll bus with asyncio #34

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions sendrecv/gst/webrtc-sendrecv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self, id_, peer_id, server):
self.id_ = id_
self.conn = None
self.pipe = None
self.bus = None
self.webrtc = None
self.peer_id = peer_id
self.server = server or 'wss://webrtc.nirbheek.in:8443'
Expand Down Expand Up @@ -105,6 +106,25 @@ def on_incoming_stream(self, _, pad):
decodebin.sync_state_with_parent()
self.webrtc.link(decodebin)

def poll_cb(self):
done = False
while self.bus.peek() != None:
msg = self.bus.pop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly pop or not? Without peek first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that I could, as poll_cb will get called again immediately if all messages haven't been read, not sure what's best?


if msg.type == Gst.MessageType.ERROR:
err = msg.parse_error()
print ("ERROR!!")
print (err.gerror, err.debug)
done = True
elif msg == Gst.MessageType.EOS:
done = True

if done:
pollfd = self.bus.get_pollfd()
asyncio.get_event_loop().remove_reader(pollfd.fd)
asyncio.ensure_future(self.conn.close())
break

def start_pipeline(self):
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
Expand All @@ -113,7 +133,11 @@ def start_pipeline(self):
self.webrtc.connect('pad-added', self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)

async def handle_sdp(self, message):
self.bus = self.pipe.get_bus()
pollfd = self.bus.get_pollfd()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second user of the new API I added in 1.14 :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was pretty happy to see this had just been added, however I had to fix an annotation issue right after 1.14.2 was merged, which reminds me this will break unless master is used :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could put this in a try except block for now?

asyncio.get_event_loop().add_reader(pollfd.fd, self.poll_cb)

async def handle_json(self, message):
assert (self.webrtc)
msg = json.loads(message)
if 'sdp' in msg:
Expand Down Expand Up @@ -144,7 +168,7 @@ async def loop(self):
print (message)
return 1
else:
await self.handle_sdp(message)
await self.handle_json(message)
return 0


Expand Down