diff --git a/Cargo.toml b/Cargo.toml index 2bbf98001..6728c3df3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ repository = "https://github.com/servo/ipc-channel" force-inprocess = [] memfd = ["syscall"] unstable = [] +async = ["futures"] [dependencies] bincode = "0.8" @@ -24,6 +25,7 @@ fnv = "1.0.3" mio = "0.6.1" syscall = { version = "0.2.1", optional = true } +futures = { version = "0.1", optional = true } [dev-dependencies] crossbeam = "0.2" diff --git a/src/ipc.rs b/src/ipc.rs index bf39a1eb6..c7a944237 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -20,6 +20,11 @@ use std::marker::PhantomData; use std::mem; use std::ops::Deref; +#[cfg(feature = "async")] +use futures::{Async, Poll, Stream}; +#[cfg(feature = "async")] +use std::io::ErrorKind; + thread_local! { static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell> = RefCell::new(Vec::new()) @@ -86,6 +91,27 @@ impl IpcReceiver where T: for<'de> Deserialize<'de> + Serialize { } } +#[cfg(feature = "async")] +impl Stream for IpcReceiver where T: for<'de> Deserialize<'de> + Serialize { + type Item = T; + type Error = bincode::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.try_recv() { + Ok(msg) => Ok(Some(msg).into()), + Err(err) => match *err { + bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::ConnectionReset => { + Ok(Async::Ready(None)) + } + bincode::ErrorKind::IoError(ref e) if e.kind() == ErrorKind::WouldBlock => { + Ok(Async::NotReady) + } + _ => Err(err), + }, + } + } +} + impl<'de, T> Deserialize<'de> for IpcReceiver where T: for<'dde> Deserialize<'dde> + Serialize { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { let index: usize = try!(Deserialize::deserialize(deserializer)); diff --git a/src/lib.rs b/src/lib.rs index 58a7f2d34..8b0e002e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,9 @@ extern crate fnv; #[macro_use] extern crate syscall; +#[cfg(feature = "async")] +extern crate futures; + pub mod ipc; pub mod platform;