Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api for awaiting responses to gateway send events #591

Merged
merged 4 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ custom_error! {
CannotConnect{error: String} = "Cannot connect due to a websocket error: {error}",
NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong",

// Errors for the in-place-events api
/// Server did not respond to our request in time
NoResponse = "Server did not respond in time",

// Other misc errors
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
}
Expand Down
1 change: 1 addition & 0 deletions src/gateway/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct Session {
pub replace: Publisher<types::SessionsReplace>,
pub reconnect: Publisher<types::GatewayReconnect>,
pub invalid: Publisher<types::GatewayInvalidSession>,
pub resumed: Publisher<types::GatewayResumed>,
}

#[derive(Default, Debug)]
Expand Down
9 changes: 6 additions & 3 deletions src/gateway/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ impl Gateway {
serde_json::from_str(&message.0).unwrap();

if gateway_payload.op_code != (Opcode::Hello as u8) {
warn!("GW: Received a non-hello opcode ({}) on gateway init", gateway_payload.op_code);
warn!(
"GW: Received a non-hello opcode ({}) on gateway init",
gateway_payload.op_code
);
return Err(GatewayError::NonHelloOnInitiate {
opcode: gateway_payload.op_code,
});
Expand Down Expand Up @@ -357,7 +360,7 @@ impl Gateway {
return;
}

let op_code = op_code_res.unwrap();
let op_code = op_code_res.unwrap();

match op_code {
// An event was dispatched, we need to look at the gateway event name t
Expand Down Expand Up @@ -413,7 +416,6 @@ impl Gateway {
}
}
},)*
"RESUMED" => (),
"SESSIONS_REPLACE" => {
let json = gateway_payload.event_data.unwrap().get();
let result: Result<Vec<types::Session>, serde_json::Error> = serde_json::from_str(json);
Expand Down Expand Up @@ -505,6 +507,7 @@ impl Gateway {
"RECENT_MENTION_DELETE" => message.recent_mention_delete,
"MESSAGE_ACK" => message.ack,
"PRESENCE_UPDATE" => user.presence_update, // TODO
"RESUMED" => session.resumed,
"RELATIONSHIP_ADD" => relationship.add,
"RELATIONSHIP_REMOVE" => relationship.remove,
"STAGE_INSTANCE_CREATE" => stage_instance.create,
Expand Down
271 changes: 255 additions & 16 deletions src/gateway/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
use std::fmt::Debug;

use super::{events::Events, *};
use crate::types::{self, Composite, Opcode, Shared};
use crate::types::{self, Composite, GuildMembersChunk, Opcode, Shared, VoiceStateUpdate};

#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;

/// Represents a handle to a Gateway connection.
///
Expand Down Expand Up @@ -106,21 +111,111 @@
}

/// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway
///
/// Fires off a [types::GatewayReady] event
pub async fn send_identify(&self, to_send: types::GatewayIdentifyPayload) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Identify..");

self.send_json_event(Opcode::Identify as u8, to_send_value).await;
self.send_json_event(Opcode::Identify as u8, to_send_value)
.await;
}

/// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway and
/// waits to receive a [types::GatewayReady] event.
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn identify(
&self,
to_send: types::GatewayIdentifyPayload,
) -> Result<types::GatewayReady, GatewayError> {
self.send_identify(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::GatewayReady>::new();

self.events
.lock()
.await
.session
.ready
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.session.ready.unsubscribe(observer);
return Err(GatewayError::NoResponse);
Fixed Show fixed Hide fixed
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.session.ready.unsubscribe(observer);
return Ok(event);
Fixed Show fixed Hide fixed
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.session.ready.unsubscribe(observer);
return Err(GatewayError::Unknown);
Fixed Show fixed Hide fixed
}
}
}
}
}

/// Sends a resume event ([types::GatewayResume]) to the gateway
///
/// Fires off a [types::GatewayResumed] event after replaying missed events
pub async fn send_resume(&self, to_send: types::GatewayResume) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Resume..");

self.send_json_event(Opcode::Resume as u8, to_send_value).await;
self.send_json_event(Opcode::Resume as u8, to_send_value)
.await;
}

/// Sends a resume event ([types::GatewayResume]) to the gateway and
/// waits to receive a [types::GatewayResumed] event.
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn resume(
&self,
to_send: types::GatewayResume,
) -> Result<types::GatewayResumed, GatewayError> {
self.send_resume(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::GatewayResumed>::new();

self.events
.lock()
.await
.session
.resumed
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.session.resumed.unsubscribe(observer);
return Err(GatewayError::NoResponse);
Fixed Show fixed Hide fixed
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.session.resumed.unsubscribe(observer);
return Ok(event);
Fixed Show fixed Hide fixed
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.session.resumed.unsubscribe(observer);
return Err(GatewayError::Unknown);
Fixed Show fixed Hide fixed
}
}
}
}
}

/// Sends an update presence event ([types::UpdatePresence]) to the gateway
Expand All @@ -133,7 +228,9 @@
.await;
}

/// Sends a request guild members ([types::GatewayRequestGuildMembers]) to the server
/// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server
///
/// Fires off one or more [types::GuildMembersChunk]
pub async fn send_request_guild_members(&self, to_send: types::GatewayRequestGuildMembers) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

Expand All @@ -143,7 +240,61 @@
.await;
}

/// Sends an update voice state ([types::UpdateVoiceState]) to the server
/// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server and
/// waits to receive all [types::GuildMembersChunk]s
///
/// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of
/// waiting
pub async fn request_guild_members(
&self,
to_send: types::GatewayRequestGuildMembers,
) -> Result<Vec<GuildMembersChunk>, GatewayError> {
self.send_request_guild_members(to_send).await;

let (observer, mut receiver) = BroadcastEventObserver::<GuildMembersChunk>::new(32);

self.events
.lock()
.await
.guild
.members_chunk
.subscribe(observer.clone());

let mut chunks = Vec::new();

loop {
tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Err(GatewayError::NoResponse);
}
result = receiver.recv() => {
match result {
Ok(event) => {
let remaining = event.chunk_count - (event.chunk_index + 1);

chunks.push(event);

if remaining < 1 {
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Ok(chunks);
}
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.guild.members_chunk.unsubscribe(observer);
return Err(GatewayError::Unknown);
}
}
}
}
}
}

/// Sends an update voice state ([types::UpdateVoiceState]) event to the server
///
/// Fires a [types::VoiceStateUpdate] event if the user left or joined a different channel
pub async fn send_update_voice_state(&self, to_send: types::UpdateVoiceState) {
let to_send_value = serde_json::to_value(to_send).unwrap();

Expand All @@ -153,21 +304,67 @@
.await;
}

/// Sends an update voice state ([types::UpdateVoiceState]) event to the server and
/// waits to receive a [types::VoiceStateUpdate] event
///
/// Returns [None] if the server sends no response after a second of
/// waiting
///
/// Note that not receiving a response is normal behaviour if the user didn't leave or join a
/// new voice channel
pub async fn update_voice_state(
&self,
to_send: types::UpdateVoiceState,
) -> Option<VoiceStateUpdate> {
self.send_update_voice_state(to_send).await;

let (observer, receiver) = OneshotEventObserver::<VoiceStateUpdate>::new();

self.events
.lock()
.await
.voice
.state_update
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(1)) => {
// Timeout
self.events.lock().await.voice.state_update.unsubscribe(observer);
return None;
Fixed Show fixed Hide fixed
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.voice.state_update.unsubscribe(observer);
return Some(event);
Fixed Show fixed Hide fixed
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.voice.state_update.unsubscribe(observer);
return None;
Fixed Show fixed Hide fixed
}
}
}
}
}

/// Sends a call sync ([types::CallSync]) to the server
pub async fn send_call_sync(&self, to_send: types::CallSync) {
let to_send_value = serde_json::to_value(to_send).unwrap();

trace!("GW: Sending Call Sync..");

self.send_json_event(Opcode::CallConnect as u8, to_send_value).await;
self.send_json_event(Opcode::CallConnect as u8, to_send_value)
.await;
}

/// Sends a request call connect event (aka [types::CallSync]) to the server
///
/// # Notes
/// Alias of [Self::send_call_sync]
/// Sends a request call connect event (aka [types::CallSync]) to the server
///
/// # Notes
/// Alias of [Self::send_call_sync]
pub async fn send_request_call_connect(&self, to_send: types::CallSync) {
self.send_call_sync(to_send).await
self.send_call_sync(to_send).await
}

/// Sends a Lazy Request ([types::LazyRequest]) to the server
Expand All @@ -180,9 +377,9 @@
.await;
}

/// Sends a Request Last Messages ([types::RequestLastMessages]) to the server
///
/// The server should respond with a [types::LastMessages] event
/// Sends a Request Last Messages ([types::RequestLastMessages]) to the server
///
/// Fires off a [types::LastMessages] event
pub async fn send_request_last_messages(&self, to_send: types::RequestLastMessages) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

Expand All @@ -192,9 +389,51 @@
.await;
}

/// Closes the websocket connection and stops all gateway tasks;
/// Sends a Request Last Messages ([types::RequestLastMessages]) event to the server and
/// waits to receive a [types::LastMessages] event
///
/// Returns [None] if the server sends no response after 5 seconds of
/// waiting
pub async fn request_last_messages(
&self,
to_send: types::RequestLastMessages,
) -> Result<types::LastMessages, GatewayError> {
self.send_request_last_messages(to_send).await;

let (observer, receiver) = OneshotEventObserver::<types::LastMessages>::new();

self.events
.lock()
.await
.message
.last_messages
.subscribe(observer.clone());

tokio::select! {
() = sleep(std::time::Duration::from_secs(5)) => {
// Timeout
self.events.lock().await.message.last_messages.unsubscribe(observer);
return Err(GatewayError::NoResponse);
Fixed Show fixed Hide fixed
}
result = receiver => {
match result {
Ok(event) => {
self.events.lock().await.message.last_messages.unsubscribe(observer);
return Ok(event);
Fixed Show fixed Hide fixed
}
Err(e) => {
warn!("Gateway in-place-events receive error: {:?}", e);
self.events.lock().await.message.last_messages.unsubscribe(observer);
return Err(GatewayError::Unknown);
Fixed Show fixed Hide fixed
}
}
}
}
}

/// Closes the websocket connection and stops all gateway tasks.
///
/// Essentially pulls the plug on the gateway, leaving it possible to resume;
/// Essentially pulls the plug on the gateway, leaving it possible to resume
pub async fn close(&self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
Expand Down
Loading
Loading