Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out web media connection into model::connection::Connection #105

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 30 additions & 293 deletions yew-ui/src/components/attendants.rs

Large diffs are not rendered by default.

105 changes: 105 additions & 0 deletions yew-ui/src/model/connection/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//
// Connection struct wraps the lower-level "Task" (task.rs), providing a heartbeat and keeping
// track of connection status.
//
use super::task::Task;
use super::ConnectOptions;
use gloo::timers::callback::Interval;
use std::cell::Cell;
use std::sync::Arc;
use types::protos::media_packet::media_packet::MediaType;
use types::protos::media_packet::MediaPacket;
use yew::prelude::Callback;

#[derive(Clone, Copy)]
enum Status {
Connecting,
Connected,
Closed,
}

pub struct Connection {
task: Arc<Task>,
heartbeat: Option<Interval>,
status: Arc<Cell<Status>>,
}

impl Connection {
pub fn connect(webtransport: bool, options: ConnectOptions) -> anyhow::Result<Self> {
let mut options = options.clone();
let userid = options.userid.clone();
let status = Arc::new(Cell::new(Status::Connecting));
{
let status = Arc::clone(&status);
options.on_connected = tap_callback(
options.on_connected,
Callback::from(move |_| status.set(Status::Connected)),
);
}
{
let status = Arc::clone(&status);
options.on_connection_lost = tap_callback(
options.on_connection_lost,
Callback::from(move |_| status.set(Status::Closed)),
);
}
let mut connection = Self {
task: Arc::new(Task::connect(webtransport, options)?),
heartbeat: None,
status,
};
connection.start_heartbeat(userid);
Ok(connection)
}

pub fn is_connected(&self) -> bool {
match self.status.get() {
Status::Connected => true,
_ => false,
}
}

fn start_heartbeat(&mut self, userid: String) {
let task = Arc::clone(&self.task);
let status = Arc::clone(&self.status);
self.heartbeat = Some(Interval::new(1000, move || {
let packet = MediaPacket {
media_type: MediaType::HEARTBEAT.into(),
email: userid.clone(),
timestamp: js_sys::Date::now(),
..Default::default()
};
if let Status::Connected = status.get() {
task.send_packet(packet);
}
}));
}

fn stop_heartbeat(&mut self) {
if let Some(heartbeat) = self.heartbeat.take() {
heartbeat.cancel();
}
}

pub fn send_packet(&self, packet: MediaPacket) {
if let Status::Connected = self.status.get() {
self.task.send_packet(packet);
}
}
}

impl Drop for Connection {
fn drop(&mut self) {
self.stop_heartbeat();
}
}

fn tap_callback<IN: 'static, OUT: 'static>(
callback: Callback<IN, OUT>,
tap: Callback<()>,
) -> Callback<IN, OUT> {
Callback::from(move |arg| {
tap.emit(());
callback.emit(arg)
})
}
8 changes: 8 additions & 0 deletions yew-ui/src/model/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod connection;
mod task;
mod webmedia;
mod websocket;
mod webtransport;

pub use connection::Connection;
pub use webmedia::ConnectOptions;
35 changes: 35 additions & 0 deletions yew-ui/src/model/connection/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Generic Task that can be a WebSocketTask or WebTransportTask.
//
// Handles rollover of connection from WebTransport to WebSocket
//
use gloo_console::log;
use types::protos::media_packet::MediaPacket;
use yew_websocket::websocket::WebSocketTask;
use yew_webtransport::webtransport::WebTransportTask;

use super::webmedia::{ConnectOptions, WebMedia};

pub(super) enum Task {
WebSocket(WebSocketTask),
WebTransport(WebTransportTask),
}

impl Task {
pub fn connect(webtransport: bool, options: ConnectOptions) -> anyhow::Result<Self> {
if webtransport {
match WebTransportTask::connect(options.clone()) {
Ok(task) => return Ok(Task::WebTransport(task)),
Err(_e) => log!("WebTransport connect failed, falling back to WebSocket"),
}
}
WebSocketTask::connect(options.clone()).map(|task| Task::WebSocket(task))
}

pub fn send_packet(&self, packet: MediaPacket) {
match self {
Task::WebSocket(ws) => ws.send_packet(packet),
Task::WebTransport(wt) => wt.send_packet(packet),
}
}
}
43 changes: 43 additions & 0 deletions yew-ui/src/model/connection/webmedia.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Defines trait giving a consistent interface for making and using connections, at the level of
// MediaPackets
//
// Implemented both for WebSockets (websocket.rs) and WebTransport (webtransport.rs)
//
use super::super::MediaPacketWrapper;
use gloo_console::log;
use protobuf::Message;
use types::protos::media_packet::MediaPacket;
use wasm_bindgen::JsValue;
use yew::prelude::Callback;

#[derive(Clone)]
pub struct ConnectOptions {
pub userid: String,
pub websocket_url: String,
pub webtransport_url: String,
pub on_inbound_media: Callback<MediaPacketWrapper>,
pub on_connected: Callback<()>,
pub on_connection_lost: Callback<()>,
}

pub(super) trait WebMedia<TASK> {
fn connect(options: ConnectOptions) -> anyhow::Result<TASK>;
fn send_bytes(&self, bytes: Vec<u8>);

fn send_packet(&self, packet: MediaPacket) {
match packet
.write_to_bytes()
.map_err(|w| JsValue::from(format!("{:?}", w)))
{
Ok(bytes) => self.send_bytes(bytes),
Err(e) => {
let packet_type = packet.media_type.enum_value_or_default();
log!(
"error sending {} packet: {:?}",
JsValue::from(format!("{}", packet_type)),
e
);
}
}
}
}
27 changes: 27 additions & 0 deletions yew-ui/src/model/connection/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// This submodule implements our WebMedia trait for WebSocketTask.
//
use super::webmedia::{ConnectOptions, WebMedia};
use gloo_console::log;
use yew::prelude::Callback;
use yew_websocket::websocket::{WebSocketService, WebSocketStatus, WebSocketTask};

impl WebMedia<WebSocketTask> for WebSocketTask {
fn connect(options: ConnectOptions) -> anyhow::Result<WebSocketTask> {
let notification = Callback::from(move |status| match status {
WebSocketStatus::Opened => options.on_connected.emit(()),
WebSocketStatus::Closed | WebSocketStatus::Error => options.on_connection_lost.emit(()),
});
log!("Connecting to ", &options.websocket_url);
let task = WebSocketService::connect(
&options.websocket_url,
options.on_inbound_media,
notification,
)?;
Ok(task)
}

fn send_bytes(&self, bytes: Vec<u8>) {
self.send_binary(bytes);
}
}
Loading