Skip to content

Commit

Permalink
Add api for awaiting responses to gateway send events (#591)
Browse files Browse the repository at this point in the history
* feat: add api for awaiting responses to send events

* docs: add examples for observers
  • Loading branch information
kozabrada123 authored Jan 5, 2025
1 parent c4407ef commit 5d4dc2c
Show file tree
Hide file tree
Showing 9 changed files with 559 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ custom_error! {
CannotConnect{error: String} = "Cannot connect due to a websocket error: {error}",
NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong",

// Errors for the in-place-events api
/// Server did not respond to our request in time
NoResponse = "Server did not respond in time",

// Other misc errors
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
}
Expand Down
1 change: 1 addition & 0 deletions src/gateway/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct Session {
pub replace: Publisher<types::SessionsReplace>,
pub reconnect: Publisher<types::GatewayReconnect>,
pub invalid: Publisher<types::GatewayInvalidSession>,
pub resumed: Publisher<types::GatewayResumed>,
}

#[derive(Default, Debug)]
Expand Down
9 changes: 6 additions & 3 deletions src/gateway/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ impl Gateway {
serde_json::from_str(&message.0).unwrap();

if gateway_payload.op_code != (Opcode::Hello as u8) {
warn!("GW: Received a non-hello opcode ({}) on gateway init", gateway_payload.op_code);
warn!(
"GW: Received a non-hello opcode ({}) on gateway init",
gateway_payload.op_code
);
return Err(GatewayError::NonHelloOnInitiate {
opcode: gateway_payload.op_code,
});
Expand Down Expand Up @@ -357,7 +360,7 @@ impl Gateway {
return;
}

let op_code = op_code_res.unwrap();
let op_code = op_code_res.unwrap();

match op_code {
// An event was dispatched, we need to look at the gateway event name t
Expand Down Expand Up @@ -413,7 +416,6 @@ impl Gateway {
}
}
},)*
"RESUMED" => (),
"SESSIONS_REPLACE" => {
let json = gateway_payload.event_data.unwrap().get();
let result: Result<Vec<types::Session>, serde_json::Error> = serde_json::from_str(json);
Expand Down Expand Up @@ -505,6 +507,7 @@ impl Gateway {
"RECENT_MENTION_DELETE" => message.recent_mention_delete,
"MESSAGE_ACK" => message.ack,
"PRESENCE_UPDATE" => user.presence_update, // TODO
"RESUMED" => session.resumed,
"RELATIONSHIP_ADD" => relationship.add,
"RELATIONSHIP_REMOVE" => relationship.remove,
"STAGE_INSTANCE_CREATE" => stage_instance.create,
Expand Down
271 changes: 255 additions & 16 deletions src/gateway/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ use log::*;
use std::fmt::Debug;

use super::{events::Events, *};
use crate::types::{self, Composite, Opcode, Shared};
use crate::types::{self, Composite, GuildMembersChunk, Opcode, Shared, VoiceStateUpdate};

#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;

/// Represents a handle to a Gateway connection.
///
Expand Down Expand Up @@ -106,21 +111,111 @@ impl GatewayHandle {
}

/// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway
///
/// Fires off a [types::GatewayReady] event
pub async fn send_identify(&self, to_send: types::GatewayIdentifyPayload) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Identify..");

self.send_json_event(Opcode::Identify as u8, to_send_value).await;
self.send_json_event(Opcode::Identify as u8, to_send_value)
.await;
}

/// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway and
/// waits to receive a [types::GatewayReady] event.
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn identify(
&self,
to_send: types::GatewayIdentifyPayload,
) -> Result<types::GatewayReady, GatewayError> {
self.send_identify(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::GatewayReady>::new();

self.events
.lock()
.await
.session
.ready
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.session.ready.unsubscribe(observer);
Err(GatewayError::NoResponse)
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.session.ready.unsubscribe(observer);
Ok(event)
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.session.ready.unsubscribe(observer);
Err(GatewayError::Unknown)
}
}
}
}
}

/// Sends a resume event ([types::GatewayResume]) to the gateway
///
/// Fires off a [types::GatewayResumed] event after replaying missed events
pub async fn send_resume(&self, to_send: types::GatewayResume) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Resume..");

self.send_json_event(Opcode::Resume as u8, to_send_value).await;
self.send_json_event(Opcode::Resume as u8, to_send_value)
.await;
}

/// Sends a resume event ([types::GatewayResume]) to the gateway and
/// waits to receive a [types::GatewayResumed] event.
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn resume(
&self,
to_send: types::GatewayResume,
) -> Result<types::GatewayResumed, GatewayError> {
self.send_resume(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::GatewayResumed>::new();

self.events
.lock()
.await
.session
.resumed
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.session.resumed.unsubscribe(observer);
Err(GatewayError::NoResponse)
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.session.resumed.unsubscribe(observer);
Ok(event)
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.session.resumed.unsubscribe(observer);
Err(GatewayError::Unknown)
}
}
}
}
}

/// Sends an update presence event ([types::UpdatePresence]) to the gateway
Expand All @@ -133,7 +228,9 @@ impl GatewayHandle {
.await;
}

/// Sends a request guild members ([types::GatewayRequestGuildMembers]) to the server
/// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server
///
/// Fires off one or more [types::GuildMembersChunk]
pub async fn send_request_guild_members(&self, to_send: types::GatewayRequestGuildMembers) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

Expand All @@ -143,7 +240,61 @@ impl GatewayHandle {
.await;
}

/// Sends an update voice state ([types::UpdateVoiceState]) to the server
/// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server and
/// waits to receive all [types::GuildMembersChunk]s
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn request_guild_members(
&self,
to_send: types::GatewayRequestGuildMembers,
) -> Result<Vec<GuildMembersChunk>, GatewayError> {
self.send_request_guild_members(to_send).await;

let (observer, mut receiver) = BroadcastEventObserver::<GuildMembersChunk>::new(32);

self.events
.lock()
.await
.guild
.members_chunk
.subscribe(observer.clone());

let mut chunks = Vec::new();

loop {
tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Err(GatewayError::NoResponse);
}
result = receiver.recv() => {
match result {
Ok(event) => {
let remaining = event.chunk_count - (event.chunk_index + 1);

chunks.push(event);

if remaining < 1 {
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Ok(chunks);
}
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Err(GatewayError::Unknown);
}
}
}
}
}
}

/// Sends an update voice state ([types::UpdateVoiceState]) event to the server
///
/// Fires a [types::VoiceStateUpdate] event if the user left or joined a different channel
pub async fn send_update_voice_state(&self, to_send: types::UpdateVoiceState) {
let to_send_value = serde_json::to_value(to_send).unwrap();

Expand All @@ -153,21 +304,67 @@ impl GatewayHandle {
.await;
}

/// Sends an update voice state ([types::UpdateVoiceState]) event to the server and
/// waits to receive a [types::VoiceStateUpdate] event
///
/// Returns [None] if the server sends no response after a second of
/// waiting
///
/// Note that not receiving a response is normal behaviour if the user didn't leave or join a
/// new voice channel
pub async fn update_voice_state(
&self,
to_send: types::UpdateVoiceState,
) -> Option<VoiceStateUpdate> {
self.send_update_voice_state(to_send).await;

let (observer, receiver) = OneshotEventObserver::<VoiceStateUpdate>::new();

self.events
.lock()
.await
.voice
.state_update
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(1)) => {
// Timeout
self.events.lock().await.voice.state_update.unsubscribe(observer);
None
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.voice.state_update.unsubscribe(observer);
Some(event)
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.voice.state_update.unsubscribe(observer);
None
}
}
}
}
}

/// Sends a call sync ([types::CallSync]) to the server
pub async fn send_call_sync(&self, to_send: types::CallSync) {
let to_send_value = serde_json::to_value(to_send).unwrap();

trace!("GW: Sending Call Sync..");

self.send_json_event(Opcode::CallConnect as u8, to_send_value).await;
self.send_json_event(Opcode::CallConnect as u8, to_send_value)
.await;
}

/// Sends a request call connect event (aka [types::CallSync]) to the server
///
/// # Notes
/// Alias of [Self::send_call_sync]
/// Sends a request call connect event (aka [types::CallSync]) to the server
///
/// # Notes
/// Alias of [Self::send_call_sync]
pub async fn send_request_call_connect(&self, to_send: types::CallSync) {
self.send_call_sync(to_send).await
self.send_call_sync(to_send).await
}

/// Sends a Lazy Request ([types::LazyRequest]) to the server
Expand All @@ -180,9 +377,9 @@ impl GatewayHandle {
.await;
}

/// Sends a Request Last Messages ([types::RequestLastMessages]) to the server
///
/// The server should respond with a [types::LastMessages] event
/// Sends a Request Last Messages ([types::RequestLastMessages]) to the server
///
/// Fires off a [types::LastMessages] event
pub async fn send_request_last_messages(&self, to_send: types::RequestLastMessages) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

Expand All @@ -192,9 +389,51 @@ impl GatewayHandle {
.await;
}

/// Closes the websocket connection and stops all gateway tasks;
/// Sends a Request Last Messages ([types::RequestLastMessages]) event to the server and
/// waits to receive a [types::LastMessages] event
///
/// Returns [None] if the server sends no response after 5 seconds of
/// waiting
pub async fn request_last_messages(
&self,
to_send: types::RequestLastMessages,
) -> Result<types::LastMessages, GatewayError> {
self.send_request_last_messages(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::LastMessages>::new();

self.events
.lock()
.await
.message
.last_messages
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.message.last_messages.unsubscribe(observer);
Err(GatewayError::NoResponse)
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.message.last_messages.unsubscribe(observer);
Ok(event)
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.message.last_messages.unsubscribe(observer);
Err(GatewayError::Unknown)
}
}
}
}
}

/// Closes the websocket connection and stops all gateway tasks.
///
/// Essentially pulls the plug on the gateway, leaving it possible to resume;
/// Essentially pulls the plug on the gateway, leaving it possible to resume
pub async fn close(&self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
Expand Down
Loading

0 comments on commit 5d4dc2c

Please sign in to comment.