diff --git a/src/errors.rs b/src/errors.rs index 84b3f1a6..f4cb9294 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -105,6 +105,10 @@ custom_error! { CannotConnect{error: String} = "Cannot connect due to a websocket error: {error}", NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong", + // Errors for the in-place-events api + /// Server did not respond to our request in time + NoResponse = "Server did not respond in time", + // Other misc errors UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}", } diff --git a/src/gateway/events.rs b/src/gateway/events.rs index 41780503..664a0501 100644 --- a/src/gateway/events.rs +++ b/src/gateway/events.rs @@ -51,6 +51,7 @@ pub struct Session { pub replace: Publisher, pub reconnect: Publisher, pub invalid: Publisher, + pub resumed: Publisher, } #[derive(Default, Debug)] diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs index 98ac9cea..98d0dea4 100644 --- a/src/gateway/gateway.rs +++ b/src/gateway/gateway.rs @@ -118,7 +118,10 @@ impl Gateway { serde_json::from_str(&message.0).unwrap(); if gateway_payload.op_code != (Opcode::Hello as u8) { - warn!("GW: Received a non-hello opcode ({}) on gateway init", gateway_payload.op_code); + warn!( + "GW: Received a non-hello opcode ({}) on gateway init", + gateway_payload.op_code + ); return Err(GatewayError::NonHelloOnInitiate { opcode: gateway_payload.op_code, }); @@ -357,7 +360,7 @@ impl Gateway { return; } - let op_code = op_code_res.unwrap(); + let op_code = op_code_res.unwrap(); match op_code { // An event was dispatched, we need to look at the gateway event name t @@ -413,7 +416,6 @@ impl Gateway { } } },)* - "RESUMED" => (), "SESSIONS_REPLACE" => { let json = gateway_payload.event_data.unwrap().get(); let result: Result, serde_json::Error> = serde_json::from_str(json); @@ -505,6 +507,7 @@ impl Gateway { "RECENT_MENTION_DELETE" => message.recent_mention_delete, "MESSAGE_ACK" => message.ack, "PRESENCE_UPDATE" => user.presence_update, // TODO + "RESUMED" => session.resumed, "RELATIONSHIP_ADD" => relationship.add, "RELATIONSHIP_REMOVE" => relationship.remove, "STAGE_INSTANCE_CREATE" => stage_instance.create, diff --git a/src/gateway/handle.rs b/src/gateway/handle.rs index f165d40b..dc2fc654 100644 --- a/src/gateway/handle.rs +++ b/src/gateway/handle.rs @@ -8,7 +8,12 @@ use log::*; use std::fmt::Debug; use super::{events::Events, *}; -use crate::types::{self, Composite, Opcode, Shared}; +use crate::types::{self, Composite, GuildMembersChunk, Opcode, Shared, VoiceStateUpdate}; + +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::sleep; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::sleep; /// Represents a handle to a Gateway connection. /// @@ -106,21 +111,111 @@ impl GatewayHandle { } /// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway + /// + /// Fires off a [types::GatewayReady] event pub async fn send_identify(&self, to_send: types::GatewayIdentifyPayload) { let to_send_value = serde_json::to_value(&to_send).unwrap(); trace!("GW: Sending Identify.."); - self.send_json_event(Opcode::Identify as u8, to_send_value).await; + self.send_json_event(Opcode::Identify as u8, to_send_value) + .await; + } + + /// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway and + /// waits to receive a [types::GatewayReady] event. + /// + /// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of + /// waiting + pub async fn identify( + &self, + to_send: types::GatewayIdentifyPayload, + ) -> Result { + self.send_identify(to_send).await; + + let (observer, receiver) = OneshotEventObserver::::new(); + + self.events + .lock() + .await + .session + .ready + .subscribe(observer.clone()); + + tokio::select! { + () = sleep(std::time::Duration::from_secs(5)) => { + // Timeout + self.events.lock().await.session.ready.unsubscribe(observer); + Err(GatewayError::NoResponse) + } + result = receiver => { + match result { + Ok(event) => { + self.events.lock().await.session.ready.unsubscribe(observer); + Ok(event) + } + Err(e) => { + warn!("Gateway in-place-events receive error: {:?}", e); + self.events.lock().await.session.ready.unsubscribe(observer); + Err(GatewayError::Unknown) + } + } + } + } } /// Sends a resume event ([types::GatewayResume]) to the gateway + /// + /// Fires off a [types::GatewayResumed] event after replaying missed events pub async fn send_resume(&self, to_send: types::GatewayResume) { let to_send_value = serde_json::to_value(&to_send).unwrap(); trace!("GW: Sending Resume.."); - self.send_json_event(Opcode::Resume as u8, to_send_value).await; + self.send_json_event(Opcode::Resume as u8, to_send_value) + .await; + } + + /// Sends a resume event ([types::GatewayResume]) to the gateway and + /// waits to receive a [types::GatewayResumed] event. + /// + /// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of + /// waiting + pub async fn resume( + &self, + to_send: types::GatewayResume, + ) -> Result { + self.send_resume(to_send).await; + + let (observer, receiver) = OneshotEventObserver::::new(); + + self.events + .lock() + .await + .session + .resumed + .subscribe(observer.clone()); + + tokio::select! { + () = sleep(std::time::Duration::from_secs(5)) => { + // Timeout + self.events.lock().await.session.resumed.unsubscribe(observer); + Err(GatewayError::NoResponse) + } + result = receiver => { + match result { + Ok(event) => { + self.events.lock().await.session.resumed.unsubscribe(observer); + Ok(event) + } + Err(e) => { + warn!("Gateway in-place-events receive error: {:?}", e); + self.events.lock().await.session.resumed.unsubscribe(observer); + Err(GatewayError::Unknown) + } + } + } + } } /// Sends an update presence event ([types::UpdatePresence]) to the gateway @@ -133,7 +228,9 @@ impl GatewayHandle { .await; } - /// Sends a request guild members ([types::GatewayRequestGuildMembers]) to the server + /// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server + /// + /// Fires off one or more [types::GuildMembersChunk] pub async fn send_request_guild_members(&self, to_send: types::GatewayRequestGuildMembers) { let to_send_value = serde_json::to_value(&to_send).unwrap(); @@ -143,7 +240,61 @@ impl GatewayHandle { .await; } - /// Sends an update voice state ([types::UpdateVoiceState]) to the server + /// Sends a request guild members ([types::GatewayRequestGuildMembers]) event to the server and + /// waits to receive all [types::GuildMembersChunk]s + /// + /// Returns [GatewayError::NoResponse] if the server sends no response after 5 seconds of + /// waiting + pub async fn request_guild_members( + &self, + to_send: types::GatewayRequestGuildMembers, + ) -> Result, GatewayError> { + self.send_request_guild_members(to_send).await; + + let (observer, mut receiver) = BroadcastEventObserver::::new(32); + + self.events + .lock() + .await + .guild + .members_chunk + .subscribe(observer.clone()); + + let mut chunks = Vec::new(); + + loop { + tokio::select! { + () = sleep(std::time::Duration::from_secs(5)) => { + // Timeout + self.events.lock().await.guild.members_chunk.unsubscribe(observer); + return Err(GatewayError::NoResponse); + } + result = receiver.recv() => { + match result { + Ok(event) => { + let remaining = event.chunk_count - (event.chunk_index + 1); + + chunks.push(event); + + if remaining < 1 { + self.events.lock().await.guild.members_chunk.unsubscribe(observer); + return Ok(chunks); + } + } + Err(e) => { + warn!("Gateway in-place-events receive error: {:?}", e); + self.events.lock().await.guild.members_chunk.unsubscribe(observer); + return Err(GatewayError::Unknown); + } + } + } + } + } + } + + /// Sends an update voice state ([types::UpdateVoiceState]) event to the server + /// + /// Fires a [types::VoiceStateUpdate] event if the user left or joined a different channel pub async fn send_update_voice_state(&self, to_send: types::UpdateVoiceState) { let to_send_value = serde_json::to_value(to_send).unwrap(); @@ -153,21 +304,67 @@ impl GatewayHandle { .await; } + /// Sends an update voice state ([types::UpdateVoiceState]) event to the server and + /// waits to receive a [types::VoiceStateUpdate] event + /// + /// Returns [None] if the server sends no response after a second of + /// waiting + /// + /// Note that not receiving a response is normal behaviour if the user didn't leave or join a + /// new voice channel + pub async fn update_voice_state( + &self, + to_send: types::UpdateVoiceState, + ) -> Option { + self.send_update_voice_state(to_send).await; + + let (observer, receiver) = OneshotEventObserver::::new(); + + self.events + .lock() + .await + .voice + .state_update + .subscribe(observer.clone()); + + tokio::select! { + () = sleep(std::time::Duration::from_secs(1)) => { + // Timeout + self.events.lock().await.voice.state_update.unsubscribe(observer); + None + } + result = receiver => { + match result { + Ok(event) => { + self.events.lock().await.voice.state_update.unsubscribe(observer); + Some(event) + } + Err(e) => { + warn!("Gateway in-place-events receive error: {:?}", e); + self.events.lock().await.voice.state_update.unsubscribe(observer); + None + } + } + } + } + } + /// Sends a call sync ([types::CallSync]) to the server pub async fn send_call_sync(&self, to_send: types::CallSync) { let to_send_value = serde_json::to_value(to_send).unwrap(); trace!("GW: Sending Call Sync.."); - self.send_json_event(Opcode::CallConnect as u8, to_send_value).await; + self.send_json_event(Opcode::CallConnect as u8, to_send_value) + .await; } - /// Sends a request call connect event (aka [types::CallSync]) to the server - /// - /// # Notes - /// Alias of [Self::send_call_sync] + /// Sends a request call connect event (aka [types::CallSync]) to the server + /// + /// # Notes + /// Alias of [Self::send_call_sync] pub async fn send_request_call_connect(&self, to_send: types::CallSync) { - self.send_call_sync(to_send).await + self.send_call_sync(to_send).await } /// Sends a Lazy Request ([types::LazyRequest]) to the server @@ -180,9 +377,9 @@ impl GatewayHandle { .await; } - /// Sends a Request Last Messages ([types::RequestLastMessages]) to the server - /// - /// The server should respond with a [types::LastMessages] event + /// Sends a Request Last Messages ([types::RequestLastMessages]) to the server + /// + /// Fires off a [types::LastMessages] event pub async fn send_request_last_messages(&self, to_send: types::RequestLastMessages) { let to_send_value = serde_json::to_value(&to_send).unwrap(); @@ -192,9 +389,51 @@ impl GatewayHandle { .await; } - /// Closes the websocket connection and stops all gateway tasks; + /// Sends a Request Last Messages ([types::RequestLastMessages]) event to the server and + /// waits to receive a [types::LastMessages] event + /// + /// Returns [None] if the server sends no response after 5 seconds of + /// waiting + pub async fn request_last_messages( + &self, + to_send: types::RequestLastMessages, + ) -> Result { + self.send_request_last_messages(to_send).await; + + let (observer, receiver) = OneshotEventObserver::::new(); + + self.events + .lock() + .await + .message + .last_messages + .subscribe(observer.clone()); + + tokio::select! { + () = sleep(std::time::Duration::from_secs(5)) => { + // Timeout + self.events.lock().await.message.last_messages.unsubscribe(observer); + Err(GatewayError::NoResponse) + } + result = receiver => { + match result { + Ok(event) => { + self.events.lock().await.message.last_messages.unsubscribe(observer); + Ok(event) + } + Err(e) => { + warn!("Gateway in-place-events receive error: {:?}", e); + self.events.lock().await.message.last_messages.unsubscribe(observer); + Err(GatewayError::Unknown) + } + } + } + } + } + + /// Closes the websocket connection and stops all gateway tasks. /// - /// Essentially pulls the plug on the gateway, leaving it possible to resume; + /// Essentially pulls the plug on the gateway, leaving it possible to resume pub async fn close(&self) { self.kill_send.send(()).unwrap(); self.websocket_send.lock().await.close().await.unwrap(); diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 98d34a9b..1bcebaa6 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,6 +10,7 @@ pub mod gateway; pub mod handle; pub mod heartbeat; pub mod message; +pub mod observers; pub mod options; pub use backends::*; @@ -17,6 +18,7 @@ pub use gateway::*; pub use handle::*; use heartbeat::*; pub use message::*; +pub use observers::*; pub use options::*; use crate::errors::GatewayError; diff --git a/src/gateway/observers.rs b/src/gateway/observers.rs new file mode 100644 index 00000000..009df6a3 --- /dev/null +++ b/src/gateway/observers.rs @@ -0,0 +1,246 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//! Includes pre-made observers to use with gateway events + +use crate::types::WebSocketEvent; +use async_trait::async_trait; +use log::warn; +use pubserve::Subscriber; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Observes an event once and sends it via a [tokio::sync::oneshot] channel. +/// +/// # Examples +/// ``` +/// let handle: GatewayHandle; // Get this either by user.gateway or by manually opening a connection +/// +/// // Let's say we want to wait until the next MessageCreate event +/// // Create the observer and receiver +/// let (observer, receiver) = OneshotEventObserver::::new(); +/// +/// // Subscribe the observer, so it receives events +/// // Note that we clone the reference so we can later unsubscribe the observer +/// handle.events.lock().await.message.create.subscribe(observer.clone()); +/// +/// // Await the event +/// let result = receiver.await; +/// +/// match result { +/// Ok(event) => { +/// println!("Yay! we received the event!"); +/// } +/// Err(e) => { +/// println!("We sadly encountered an error: {:?}", e); +/// } +/// } +/// +/// // The observer has now served its purpose, unsubscribe it +/// handle.events.lock().await.message.create.unsubscribe(observer); +/// +/// // Since we dropped all the references to the observer, +/// // it is now deleted +/// ``` +/// +/// We can also use [tokio::select] to await with a timeout: +/// +/// ``` +/// let handle: GatewayHandle; // Get this either by user.gateway or by manually opening a connection +/// +/// // Let's say we want to wait until the next MessageCreate event, if it happens in the next 10 seconds +/// // Create the observer and receiver +/// let (observer, receiver) = OneshotEventObserver::::new(); +/// +/// // Subscribe the observer, so it receives events +/// // Note that we clone the reference so we can later unsubscribe the observer +/// handle.events.lock().await.message.create.subscribe(observer.clone()); +/// +/// tokio::select! { +/// () = sleep(Duration::from_secs(10)) => { +/// // No event happened in 10 seconds +/// } +/// result = receiver => { +/// match result { +/// Ok(event) => { +/// println!("Yay! we received the event!"); +/// } +/// Err(e) => { +/// println!("We sadly encountered an error: {:?}", e); +/// } +/// } +/// } +/// } +/// +/// // The observer has now served its purpose, unsubscribe it +/// handle.events.lock().await.message.create.unsubscribe(observer); +/// +/// // Since we dropped all the references to the observer, +/// // it is now deleted +/// ``` +#[derive(Debug)] +pub struct OneshotEventObserver +where + T: WebSocketEvent + Clone, +{ + pub sender: Mutex>>, +} + +impl OneshotEventObserver { + /// Creates a new [OneshotEventObserver] for the given event + /// + /// You must subscribe it to your gateway event; after receiving on the channel, + /// you should unsubscribe it if possible + pub fn new() -> ( + Arc>, + tokio::sync::oneshot::Receiver, + ) { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + let observer = Arc::new(OneshotEventObserver { + sender: Mutex::new(Some(sender)), + }); + + (observer, receiver) + } +} + +#[async_trait] +impl Subscriber for OneshotEventObserver +where + T: WebSocketEvent + Clone, +{ + async fn update(&self, message: &T) { + let mut lock = self.sender.lock().await; + + if lock.is_none() { + warn!("OneshotEventObserver received event after closing channel!"); + return; + } + + let sender = lock.take().unwrap(); + + match sender.send(message.clone()) { + Ok(_) => {} + Err(e) => { + warn!("OneshotEventObserver failed to send event: {:?}", e); + } + } + } +} + +/// Observes an event indefinitely and sends it via a [tokio::sync::broadcast] channel +/// +/// # Examples +/// ``` +/// let handle: GatewayHandle; // Get this either by user.gateway or by manually opening a connection +/// +/// // Let's say we want to wait for every MessageCreate event +/// // Create the observer and receiver +/// let (observer, receiver) = BroadcastEventObserver::::new(); +/// +/// // Subscribe the observer, so it receives events +/// // Note that we clone the reference so we can later unsubscribe the observer +/// handle.events.lock().await.message.create.subscribe(observer.clone()); +/// +/// loop { +/// let result = receiver.recv().await; +/// +/// match result { +/// Ok(event) => { +/// println!("Yay! we received the event!"); +/// } +/// Err(e) => { +/// println!("We sadly encountered an error: {:?}", e); +/// break; +/// } +/// } +/// } +/// +/// // The observer has now served its purpose, unsubscribe it +/// handle.events.lock().await.message.create.unsubscribe(observer); +/// +/// // Since we dropped all the references to the observer, +/// // it is now deleted +/// ``` +/// +/// We can also use [tokio::select] to await with a timeout: +/// +/// ``` +/// let handle: GatewayHandle; // Get this either by user.gateway or by manually opening a connection +/// +/// // Let's say we want to wait for every MessageCreate event, if it takes less than 10 seconds +/// // Create the observer and receiver +/// let (observer, receiver) = BroadcastEventObserver::::new(); +/// +/// // Subscribe the observer, so it receives events +/// // Note that we clone the reference so we can later unsubscribe the observer +/// handle.events.lock().await.message.create.subscribe(observer.clone()); +/// +/// loop { +/// tokio::select! { +/// () = sleep(Duration::from_secs(10)) => { +/// println!("Waited for 10 seconds with no message, stopping"); +/// break; +/// } +/// result = receiver.recv() => { +/// match result { +/// Ok(event) => { +/// println!("Yay! we received the event!"); +/// } +/// Err(e) => { +/// println!("We sadly encountered an error: {:?}", e); +/// break; +/// } +/// } +/// } +/// } +/// } +/// +/// // The observer has now served its purpose, unsubscribe it +/// handle.events.lock().await.message.create.unsubscribe(observer); +/// +/// // Since we dropped all the references to the observer, +/// // it is now deleted +/// ``` +#[derive(Debug)] +pub struct BroadcastEventObserver +where + T: WebSocketEvent + Clone, +{ + pub sender: tokio::sync::broadcast::Sender, +} + +impl BroadcastEventObserver { + /// Creates a new [BroadcastEventObserver] for the given event + /// + /// You must subscribe it to your gateway event + pub fn new( + channel_size: usize, + ) -> ( + Arc>, + tokio::sync::broadcast::Receiver, + ) { + let (sender, receiver) = tokio::sync::broadcast::channel(channel_size); + + let observer = Arc::new(BroadcastEventObserver { sender }); + + (observer, receiver) + } +} + +#[async_trait] +impl Subscriber for BroadcastEventObserver +where + T: WebSocketEvent + Clone, +{ + async fn update(&self, message: &T) { + match self.sender.send(message.clone()) { + Ok(_) => {} + Err(e) => { + warn!("BroadcastEventObserver failed to send event: {:?}", e); + } + } + } +} diff --git a/src/types/events/request_members.rs b/src/types/events/request_members.rs index 1c0beb02..d4a3c1e2 100644 --- a/src/types/events/request_members.rs +++ b/src/types/events/request_members.rs @@ -19,26 +19,30 @@ use serde::{Deserialize, Serialize}; /// # Reference /// See pub struct GatewayRequestGuildMembers { - /// Id(s) of the guild(s) to get members for + /// Id(s) of the guild(s) to get members for pub guild_id: OneOrMoreSnowflakes, - /// The user id(s) to request (0 - 100) + /// The user id(s) to request (0 - 100) pub user_ids: Option, - /// String that the username / nickname starts with, or an empty string for all members + /// String that the username / nickname starts with, or an empty string for all members + #[serde(skip_serializing_if = "Option::is_none")] pub query: Option, - /// Maximum number of members to send matching the query (0 - 100) - /// - /// Must be 0 with an empty query - pub limit: u8, + /// Maximum number of members to send matching the query (0 - 100) + /// + /// Must be 0 with an empty query + #[serde(skip_serializing_if = "Option::is_none")] + pub limit: Option, - /// Whether to return the [Presence](crate::types::events::PresenceUpdate) of the matched - /// members + /// Whether to return the [Presence](crate::types::events::PresenceUpdate) of the matched + /// members + #[serde(skip_serializing_if = "Option::is_none")] pub presences: Option, - /// Unique string to identify the received event for this specific request. - /// - /// Up to 32 bytes. If you send a longer nonce, it will be ignored + /// Unique string to identify the received event for this specific request. + /// + /// Up to 32 bytes. If you send a longer nonce, it will be ignored + #[serde(skip_serializing_if = "Option::is_none")] pub nonce: Option, } diff --git a/src/types/events/resume.rs b/src/types/events/resume.rs index 2485dc3c..3b313170 100644 --- a/src/types/events/resume.rs +++ b/src/types/events/resume.rs @@ -5,10 +5,27 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; +/// Used to replay missed events when a disconnected client resumes. +/// +/// # Reference +/// See #[derive(Debug, Clone, Deserialize, Serialize, Default, WebSocketEvent)] pub struct GatewayResume { pub token: String, + /// Existing session id pub session_id: String, + /// Last sequence number received pub seq: String, } +/// Sent in response to a [GatewayResume]. +/// +/// Signifies the end of event replaying. +/// +/// # Reference +/// See +#[derive(Debug, Clone, Deserialize, Serialize, Default, WebSocketEvent)] +pub struct GatewayResumed { + #[serde(rename = "_trace")] + pub trace: Vec, +} diff --git a/tests/gateway.rs b/tests/gateway.rs index a48dc9db..ed4df344 100644 --- a/tests/gateway.rs +++ b/tests/gateway.rs @@ -11,8 +11,8 @@ use async_trait::async_trait; use chorus::errors::GatewayError; use chorus::gateway::*; use chorus::types::{ - self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared, - RoleCreateModifySchema, RoleObject, + self, Channel, ChannelCreateSchema, ChannelModifySchema, IntoShared, RoleCreateModifySchema, + RoleObject, }; use pubserve::Subscriber; #[cfg(target_arch = "wasm32")] @@ -37,18 +37,6 @@ async fn test_gateway_establish() { common::teardown(bundle).await } -#[derive(Debug)] -struct GatewayReadyObserver { - channel: tokio::sync::mpsc::Sender<()>, -} - -#[async_trait] -impl Subscriber for GatewayReadyObserver { - async fn update(&self, _data: &GatewayReady) { - self.channel.send(()).await.unwrap(); - } -} - #[derive(Debug)] struct GatewayErrorObserver { channel: tokio::sync::mpsc::Sender, @@ -71,33 +59,15 @@ async fn test_gateway_authenticate() { .await .unwrap(); - let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1); - - let observer = Arc::new(GatewayReadyObserver { - channel: ready_send, - }); - - gateway - .events - .lock() - .await - .session - .ready - .subscribe(observer); - let mut identify = types::GatewayIdentifyPayload::common(); identify.token = bundle.user.token.clone(); - gateway.send_identify(identify).await; - - tokio::select! { - // Fail, we timed out waiting for it - () = sleep(Duration::from_secs(20)) => { - println!("Timed out waiting for event, failing.."); + match gateway.identify(identify).await { + Ok(_) => {} + Err(e) => { + println!("Failed to identify with error {}, failing", e); assert!(false); } - // Success, we have received it - Some(_) = ready_receive.recv() => {} } common::teardown(bundle).await @@ -123,20 +93,7 @@ async fn test_gateway_errors() { .await .unwrap(); - // First we'll authenticate, wait for ready, and then authenticate again to get AlreadyAuthenticated - let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1); - - let observer = Arc::new(GatewayReadyObserver { - channel: ready_send, - }); - - gateway - .events - .lock() - .await - .session - .ready - .subscribe(observer); + // First we'll authenticate, then authenticate again to get AlreadyAuthenticated let (error_send, mut error_receive) = tokio::sync::mpsc::channel(1); @@ -149,24 +106,19 @@ async fn test_gateway_errors() { let mut identify = types::GatewayIdentifyPayload::common(); identify.token = bundle.user.token.clone(); - // Identify and wait to receive ready - gateway.send_identify(identify.clone()).await; - - tokio::select! { - // Fail, we timed out waiting for it - () = sleep(Duration::from_secs(20)) => { - println!("Timed out waiting for ready, failing.."); + match gateway.identify(identify.clone()).await { + Ok(_) => {} + Err(e) => { + println!("Failed to identify with error {}, failing", e); assert!(false); } - // Success, we have received it - Some(_) = ready_receive.recv() => {} } // Identify again, so we should receive already authenticated gateway.send_identify(identify).await; tokio::select! { - // Fail, we timed out waiting for it + // Fail, we timed out waiting for the error () = sleep(Duration::from_secs(20)) => { assert!(false); }