Skip to content

Commit

Permalink
Merge pull request #12 from ported-pw/multi-reponse-jobs
Browse files Browse the repository at this point in the history
Correctly handle jobs that can have multiple responses
  • Loading branch information
icewind1991 authored Sep 28, 2024
2 parents af660a5 + 33c6451 commit f802076
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 1 deletion.
6 changes: 6 additions & 0 deletions protobuf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ impl<T: MsgKindEnum> PartialEq<T> for MsgKind {
self.0.eq(&other.enum_value())
}
}

/// Trait for implementing a check for completion of a job that returns a response stream
pub trait JobMultiple {
/// If the job has completed
fn completed(&self) -> bool;
}
8 changes: 8 additions & 0 deletions protobuf/steam/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
mod generated;

pub use generated::*;

impl steam_vent_proto_common::JobMultiple
for steammessages_clientserver_appinfo::CMsgClientPICSProductInfoResponse
{
fn completed(&self) -> bool {
!self.response_pending.unwrap_or(false)
}
}
20 changes: 20 additions & 0 deletions src/connection/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::{debug, error};
#[derive(Clone)]
pub struct MessageFilter {
job_id_filters: Arc<DashMap<JobId, oneshot::Sender<RawNetMessage>>>,
job_id_multi_filters: Arc<DashMap<JobId, broadcast::Sender<RawNetMessage>>>,
notification_filters: Arc<DashMap<&'static str, broadcast::Sender<ServiceMethodNotification>>>,
kind_filters: Arc<DashMap<MsgKind, broadcast::Sender<RawNetMessage>>>,
oneshot_kind_filters: Arc<DashMap<MsgKind, oneshot::Sender<RawNetMessage>>>,
Expand All @@ -26,6 +27,7 @@ impl MessageFilter {
) -> Self {
let filter = MessageFilter {
job_id_filters: Default::default(),
job_id_multi_filters: Default::default(),
kind_filters: Default::default(),
notification_filters: Default::default(),
oneshot_kind_filters: Default::default(),
Expand All @@ -42,6 +44,12 @@ impl MessageFilter {
.remove(&message.header.target_job_id)
{
tx.send(message).ok();
} else if let Some(map_ref) = filter_send
.job_id_multi_filters
.get(&message.header.target_job_id)
{
let tx = map_ref.value();
tx.send(message).ok();
} else if let Some((_, tx)) =
filter_send.oneshot_kind_filters.remove(&message.kind)
{
Expand Down Expand Up @@ -82,6 +90,18 @@ impl MessageFilter {
rx
}

pub fn on_job_id_multi(&self, id: JobId) -> broadcast::Receiver<RawNetMessage> {
let map_ref = self
.job_id_multi_filters
.entry(id)
.or_insert_with(|| broadcast::channel(16).0);
map_ref.subscribe()
}

pub fn complete_job_id_multi(&self, id: JobId) {
self.job_id_multi_filters.remove(&id);
}

pub fn on_notification(
&self,
job_name: &'static str,
Expand Down
32 changes: 31 additions & 1 deletion src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::net::IpAddr;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use steam_vent_proto::MsgKindEnum;
use steam_vent_proto::{JobMultiple, MsgKindEnum};
use steamid_ng::{AccountType, SteamID};
use tokio::sync::Mutex;
use tokio::task::spawn;
Expand Down Expand Up @@ -300,6 +300,36 @@ pub trait ConnectionTrait: Sync + Debug {
}
}

fn job_multi<Msg: NetMessage, Rsp: NetMessage + JobMultiple>(
&self,
msg: Msg,
) -> impl Future<Output = Result<Vec<Rsp>>> + Send {
async {
let header = self.session().header(true);
let source_job_id = header.source_job_id;
let mut recv = self.filter().on_job_id_multi(source_job_id);
let messages = {
self.raw_send(header, msg).await?;
let mut messages = vec![];
loop {
let msg: Rsp = timeout(self.timeout(), recv.recv())
.await
.map_err(|_| NetworkError::Timeout)?
.map_err(|_| NetworkError::EOF)?
.into_message()?;
let completed = msg.completed();
messages.push(msg);
if completed {
break;
}
}
Ok(messages)
};
self.filter().complete_job_id_multi(source_job_id);
messages
}
}

#[instrument(skip(msg), fields(kind = ?Msg::KIND))]
fn send<Msg: NetMessage>(&self, msg: Msg) -> impl Future<Output = Result<()>> + Send {
self.raw_send(self.session().header(false), msg)
Expand Down

0 comments on commit f802076

Please sign in to comment.