Skip to content

Commit

Permalink
Auto merge of #167 - spinda:async, r=jdm
Browse files Browse the repository at this point in the history
Make IpcReceiver a futures::Stream

Extracts the IpcReceiver half of #165.

This is placed behind an `async` feature so the dependency on futures isn't forced.
  • Loading branch information
bors-servo authored Aug 21, 2017
2 parents 36e7f5f + 484f2c4 commit 583f90c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/servo/ipc-channel"
force-inprocess = []
memfd = ["syscall"]
unstable = []
async = ["futures"]

[dependencies]
bincode = "0.8"
Expand All @@ -24,6 +25,7 @@ fnv = "1.0.3"
mio = "0.6.1"

syscall = { version = "0.2.1", optional = true }
futures = { version = "0.1", optional = true }

[dev-dependencies]
crossbeam = "0.2"
26 changes: 26 additions & 0 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;

#[cfg(feature = "async")]
use futures::{Async, Poll, Stream};
#[cfg(feature = "async")]
use std::io::ErrorKind;

thread_local! {
static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
RefCell::new(Vec::new())
Expand Down Expand Up @@ -86,6 +91,27 @@ impl<T> IpcReceiver<T> where T: for<'de> Deserialize<'de> + Serialize {
}
}

#[cfg(feature = "async")]
impl<T> Stream for IpcReceiver<T> where T: for<'de> Deserialize<'de> + Serialize {
type Item = T;
type Error = bincode::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg).into()),
Err(err) => match *err {
bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::ConnectionReset => {
Ok(Async::Ready(None))
}
bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::WouldBlock => {
Ok(Async::NotReady)
}
_ => Err(err),
},
}
}
}

impl<'de, T> Deserialize<'de> for IpcReceiver<T> where T: for<'dde> Deserialize<'dde> + Serialize {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de> {
let index: usize = try!(Deserialize::deserialize(deserializer));
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ extern crate fnv;
#[macro_use]
extern crate syscall;

#[cfg(feature = "async")]
extern crate futures;


pub mod ipc;
pub mod platform;
Expand Down

0 comments on commit 583f90c

Please sign in to comment.