Skip to content

Commit

Permalink
use mpsc for jobid_multi channels
Browse files Browse the repository at this point in the history
  • Loading branch information
icewind1991 committed Sep 28, 2024
1 parent f802076 commit b207711
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
16 changes: 7 additions & 9 deletions src/connection/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::sync::Arc;
use steam_vent_proto::enums_clientserver::EMsg;
use steam_vent_proto::MsgKind;
use tokio::spawn;
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::StreamExt;
use tracing::{debug, error};

#[derive(Clone)]
pub struct MessageFilter {
job_id_filters: Arc<DashMap<JobId, oneshot::Sender<RawNetMessage>>>,
job_id_multi_filters: Arc<DashMap<JobId, broadcast::Sender<RawNetMessage>>>,
job_id_multi_filters: Arc<DashMap<JobId, mpsc::Sender<RawNetMessage>>>,
notification_filters: Arc<DashMap<&'static str, broadcast::Sender<ServiceMethodNotification>>>,
kind_filters: Arc<DashMap<MsgKind, broadcast::Sender<RawNetMessage>>>,
oneshot_kind_filters: Arc<DashMap<MsgKind, oneshot::Sender<RawNetMessage>>>,
Expand Down Expand Up @@ -49,7 +49,7 @@ impl MessageFilter {
.get(&message.header.target_job_id)
{
let tx = map_ref.value();
tx.send(message).ok();
tx.send(message).await.ok();
} else if let Some((_, tx)) =
filter_send.oneshot_kind_filters.remove(&message.kind)
{
Expand Down Expand Up @@ -90,12 +90,10 @@ impl MessageFilter {
rx
}

pub fn on_job_id_multi(&self, id: JobId) -> broadcast::Receiver<RawNetMessage> {
let map_ref = self
.job_id_multi_filters
.entry(id)
.or_insert_with(|| broadcast::channel(16).0);
map_ref.subscribe()
pub fn on_job_id_multi(&self, id: JobId) -> mpsc::Receiver<RawNetMessage> {
let (tx, rx) = mpsc::channel(16);
self.job_id_multi_filters.insert(id, tx);
rx
}

pub fn complete_job_id_multi(&self, id: JobId) {
Expand Down
2 changes: 1 addition & 1 deletion src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub trait ConnectionTrait: Sync + Debug {
let msg: Rsp = timeout(self.timeout(), recv.recv())
.await
.map_err(|_| NetworkError::Timeout)?
.map_err(|_| NetworkError::EOF)?
.ok_or(NetworkError::EOF)?
.into_message()?;
let completed = msg.completed();
messages.push(msg);
Expand Down

0 comments on commit b207711

Please sign in to comment.