diff --git a/Cargo.toml b/Cargo.toml index 052b7eeb..34a8221c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ members = [ "importer", "loader", "schema", - "client", "cli", "daemon", "core", @@ -33,10 +32,11 @@ distill-importer = { version = "=0.0.3", path = "importer", optional = true } distill-loader = { version = "=0.0.3", path = "loader", optional = true } [dev-dependencies] -futures = "0.3" +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-io = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false } serde = "1" uuid = "0.8.2" -tokio = { version = "1.2", features = ["sync"] } serial_test = "0.5.1" [features] diff --git a/README.md b/README.md index 74f7b780..3727fb47 100644 --- a/README.md +++ b/README.md @@ -101,3 +101,8 @@ Licensed under either of at your option. PLEASE NOTE that some dependencies may be licensed under other terms. These are listed in [deny.toml](deny.toml) under licenses.exceptions on a best-effort basis, and are validated in every CI run using [cargo-deny](https://github.com/EmbarkStudios/cargo-deny). + +## Vendored Code + +In addition to crate dependencies, this project contains some vendored code: + * [daemon/src/timout.rs](daemon/src/timout.rs) - Used under Apache 2.0/MIT license. (Only used in unit tests) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index a6cde268..9000db18 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -12,11 +12,11 @@ distill-schema = { version = "=0.0.3", path = "../schema" } capnp = "0.14.0" capnp-rpc = "0.14.0" -tokio = { version = "1.2", features = ["io-std", "rt", "rt-multi-thread", "net", "io-util"] } -tokio-util = { version = "0.6.1", features = ["codec", "compat"] } -futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } +futures-util = { version = "0.3", default-features = false } uuid = "0.8.2" async-trait = "0.1.22" crossterm = { version = "0.17", features = ["event-stream"] } defer = "0.1.0" -tokio-stream = { version = "0.1.2", features = ["io-util"] } +async-io = "1.4.1" +async-net = "1.6.0" +async-executor = "1.4.1" \ No newline at end of file diff --git a/cli/src/lib.rs b/cli/src/lib.rs index bfc88468..267ccb4b 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -9,6 +9,7 @@ use distill_schema::{ }; pub mod shell; +use futures_util::AsyncReadExt; use shell::Autocomplete; pub use shell::Command; @@ -52,16 +53,18 @@ pub struct Context { snapshot: Rc>, } -pub async fn create_context() -> Result> { +pub async fn create_context( + local: &async_executor::LocalExecutor<'_>, +) -> Result> { use std::net::ToSocketAddrs; let addr = "127.0.0.1:9999".to_socket_addrs()?.next().unwrap(); - let stream = tokio::net::TcpStream::connect(&addr).await?; + let stream = async_net::TcpStream::connect(&addr).await?; stream.set_nodelay(true).unwrap(); - use tokio_util::compat::*; - let (reader, writer) = stream.into_split(); + + let (reader, writer) = stream.split(); let rpc_network = Box::new(twoparty::VatNetwork::new( - reader.compat(), - writer.compat_write(), + reader, + writer, rpc_twoparty_capnp::Side::Client, *ReaderOptions::new() .nesting_limit(64) @@ -72,14 +75,18 @@ pub async fn create_context() -> Result> { let hub: asset_hub::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); let _disconnector = rpc_system.get_disconnector(); - tokio::task::spawn_local(rpc_system); + + local.spawn(rpc_system).detach(); + let snapshot = Rc::new(RefCell::new({ let request = hub.get_snapshot_request(); request.send().promise.await?.get()?.get_snapshot()? })); + let listener: asset_hub::listener::Client = capnp_rpc::new_client(ListenerImpl { snapshot: snapshot.clone(), }); + let mut request = hub.register_listener_request(); request.get().set_listener(listener); diff --git a/cli/src/main.rs b/cli/src/main.rs index 78ce5543..144e787f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,13 +1,14 @@ use distill_cli::{shell::Shell, *}; pub fn main() -> Result<(), Box> { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let local = tokio::task::LocalSet::new(); - runtime.block_on(local.run_until(async_main())) + let local = async_executor::LocalExecutor::new(); + async_io::block_on(local.run(async_main(&local))) } -async fn async_main() -> Result<(), Box> { - let ctx = create_context().await?; +async fn async_main( + local: &async_executor::LocalExecutor<'_>, +) -> Result<(), Box> { + let ctx = create_context(local).await?; let mut shell = Shell::new(ctx); diff --git a/cli/src/shell.rs b/cli/src/shell.rs index a4f5ab17..52d379ec 100644 --- a/cli/src/shell.rs +++ b/cli/src/shell.rs @@ -8,7 +8,7 @@ use crossterm::{ style::{Color, Print, ResetColor, SetBackgroundColor, SetForegroundColor}, terminal::{disable_raw_mode, enable_raw_mode, Clear, ClearType}, }; -use futures::{ +use futures_util::{ future::{pending, FusedFuture, FutureExt}, select, stream::StreamExt, @@ -425,8 +425,7 @@ impl Shell { break 'event_loop; } _ => { - let items = - std::mem::replace(&mut autocomplete.items, Vec::new()); + let items = std::mem::take(&mut autocomplete.items); state = State::Select { overlap: autocomplete.overlap, items, diff --git a/client/Cargo.toml b/client/Cargo.toml index aca345cc..e69de29b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,16 +0,0 @@ -[package] -name = "distill-client" -version = "0.0.1" -authors = ["Karl Bergström "] -edition = "2018" -license = "MIT OR Apache-2.0" -publish = false - -[dependencies] -distill-schema = { path = "../schema" } - -capnp = "0.14.0" -capnp-rpc = "0.14.0" -tokio = "1.2" -tokio-util = { version = "0.6.1", features = ["compat"] } -futures-util = { version = "0.3", default-features = false } diff --git a/client/src/main.rs b/client/src/main.rs deleted file mode 100644 index b2da2948..00000000 --- a/client/src/main.rs +++ /dev/null @@ -1,100 +0,0 @@ -// this is just a test crate at the moment -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - thread, - time::Instant, -}; - -use capnp::message::ReaderOptions; -use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; -use distill_schema::service::asset_hub; -use tokio::runtime::Runtime; - -pub fn main() { - use std::net::ToSocketAddrs; - - let addr = "127.0.0.1:9999".to_socket_addrs().unwrap().next().unwrap(); - - let num_assets = Arc::new(AtomicUsize::new(0)); - let byte_size = Arc::new(AtomicUsize::new(0)); - let start_time = Instant::now(); - let mut threads = Vec::new(); - for _ in 0..8 { - let num_assets = num_assets.clone(); - let byte_size = byte_size.clone(); - threads.push(thread::spawn(move || { - let runtime = Runtime::new().unwrap(); - let stream = runtime - .block_on(::tokio::net::TcpStream::connect(&addr)) - .unwrap(); - stream.set_nodelay(true).unwrap(); - use futures_util::AsyncReadExt; - use tokio_util::compat::*; - let (reader, writer) = stream.compat().split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - reader, - writer, - rpc_twoparty_capnp::Side::Client, - *ReaderOptions::new() - .nesting_limit(64) - .traversal_limit_in_words(Some(256 * 1024 * 1024)), - )); - - let mut rpc_system = RpcSystem::new(rpc_network, None); - let hub: asset_hub::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - let disconnector = rpc_system.get_disconnector(); - - // Doesn't work because RpcSystem is not send - // use futures_util::future::TryFutureExt; - // runtime.spawn(rpc_system.map_err(|_| ())); - - // This can be replaced by above if RpcSystem becomes Send - tokio::task::spawn_local(rpc_system); - - let request = hub.get_snapshot_request(); - let snapshot = runtime - .block_on(request.send().promise) - .unwrap() - .get() - .unwrap() - .get_snapshot() - .unwrap(); - for _i in 0..1000 { - let request = snapshot.get_all_asset_metadata_request(); - let result = runtime.block_on(request.send().promise).unwrap(); - let result = result.get().unwrap(); - let len = result.get_assets().unwrap().len(); - num_assets.fetch_add(len as usize, Ordering::SeqCst); - byte_size.fetch_add( - result.total_size().unwrap().word_count as usize * 8, - Ordering::SeqCst, - ); - } - runtime - .block_on(disconnector) - .expect("Failed to block on RPC disconnector."); - - // Dropping the runtime blocks until it completes - std::mem::forget(runtime); - })); - thread::sleep(std::time::Duration::new(0, 10)); - } - for thread in threads { - thread.join().unwrap(); - } - let total_time = Instant::now() - start_time; - println!( - "got {} assets and {} bytes in {}ms", - num_assets.load(Ordering::Acquire), - byte_size.load(Ordering::Acquire), - total_time.as_millis() - ); - println!( - "{} bytes per second and {} assets per second", - (byte_size.load(Ordering::Acquire) as f64 / total_time.as_secs_f64()), - (num_assets.load(Ordering::Acquire) as f64 / total_time.as_secs_f64()), - ); -} diff --git a/core/Cargo.toml b/core/Cargo.toml index cc0a2b3f..1bedf631 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,6 +15,7 @@ path_utils = ["dunce", "path-clean", "path-slash"] uuid = { version = "0.8.2", features = ["v4"] } serde = { version = "1", optional = true, features = ["derive"] } futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-channel = { version = "0.3", default-features = false, features = ["std"] } type-uuid = { version = "0.1.2", optional = true, default-features = false } dunce = { version = "1.0", optional = true } path-clean = { version = "0.1", optional = true } diff --git a/core/src/distill_signal.rs b/core/src/distill_signal.rs new file mode 100644 index 00000000..a893b0e8 --- /dev/null +++ b/core/src/distill_signal.rs @@ -0,0 +1,87 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Thin wrapper around `futures_channel::oneshot` to match `tokio::sync::oneshot` interface. +pub fn oneshot() -> (Sender, Receiver) { + let (sender, receiver) = futures_channel::oneshot::channel(); + (Sender::new(sender), Receiver::new(receiver)) +} + +#[derive(Debug)] +pub struct Receiver { + inner: futures_channel::oneshot::Receiver, +} + +impl Receiver { + #[inline(always)] + pub(crate) fn new(inner: futures_channel::oneshot::Receiver) -> Self { + Receiver { inner } + } + + #[inline] + pub fn try_recv(&mut self) -> Result { + match self.inner.try_recv() { + Ok(Some(x)) => Ok(x), + Ok(None) => Err(TryRecvError::Empty), + Err(_canceled) => Err(TryRecvError::Closed), + } + } +} + +impl Future for Receiver { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match self.try_recv() { + Ok(value) => Poll::Ready(Ok(value)), + Err(TryRecvError::Closed) => Poll::Ready(Err(RecvError { 0: () })), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +#[derive(Debug)] +pub struct Sender { + inner: futures_channel::oneshot::Sender, +} + +impl Sender { + #[inline(always)] + pub(crate) fn new(inner: futures_channel::oneshot::Sender) -> Self { + Sender { inner } + } + + #[inline] + pub fn send(self, value: T) -> Result<(), RecvError> { + match self.inner.send(value) { + Ok(_) => Ok(()), + Err(_) => Err(RecvError { 0: () }), + } + } +} + +use self::error::*; +pub mod error { + use std::fmt; + + #[derive(Debug, Eq, PartialEq)] + pub struct RecvError(pub(super) ()); + + #[derive(Debug, Eq, PartialEq)] + pub enum TryRecvError { + Empty, + Closed, + } + + impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TryRecvError::Empty => write!(fmt, "channel empty"), + TryRecvError::Closed => write!(fmt, "channel closed"), + } + } + } + + impl std::error::Error for TryRecvError {} +} diff --git a/core/src/lib.rs b/core/src/lib.rs index da9560d2..f5be7a2c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -13,6 +13,8 @@ use uuid::Uuid; pub mod importer_context; pub mod utils; +pub mod distill_signal; + /// A universally unique identifier for an asset. /// An asset can be a value of any Rust type that implements /// [`TypeUuidDynamic`] + [serde::Serialize] + [Send]. diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 170c221d..d722c24c 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -20,20 +20,10 @@ notify = "4.0.15" distill-downstream-lmdb-rkv = "0.11.0-windows-fix" rayon = { version = "1.3", optional = true } log = { version = "0.4", features = ["serde", "std"] } -tokio = { version = "1.2", features = [ - "net", - "fs", - "sync", - "time", - "rt", - "rt-multi-thread", - "io-util", - "macros", -] } -tokio-util = { version = "0.6.1", features = ["compat"] } -futures = "0.3" -async-lock = { version = "2.1" } -async-channel = { version = "1.4" } +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-io = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false, features = ["std", "async-await", "async-await-macro"] } +futures-channel = { version = "0.3", default-features = false, features = ["std"] } event-listener = { version = "2.4.0" } serde = "1" serde_derive = "1.0" @@ -45,9 +35,19 @@ fern = { version = "0.6.0", optional = true } chrono = { version = "0.4.19", default-features = false, features = ["clock"], optional = true } pin-project = "1.0" uuid = { version = "0.8.2", features = ["v4"] } +async-io = "1.4.1" +async-executor = "1.4.1" +async-net = "1.6.0" +async-fs = "1.5.0" +async-lock = { version = "2.1" } +async-channel = { version = "1.4" } +bevy_tasks = "0.5.0" [dev-dependencies] tempfile = "3.2.0" +futures-test = "0.3.15" +# used for timeout, which was vendored from async_std. It is only used in tests +pin-project-lite = "0.2.6" [features] parallel_hash = ["rayon"] diff --git a/daemon/src/asset_hub_service.rs b/daemon/src/asset_hub_service.rs index 3161ec6e..0f4579bb 100644 --- a/daemon/src/asset_hub_service.rs +++ b/daemon/src/asset_hub_service.rs @@ -18,7 +18,7 @@ use distill_schema::{ parse_artifact_metadata, parse_db_asset_ref, service::asset_hub, }; -use futures::{AsyncReadExt, TryFutureExt}; +use futures_util::{AsyncReadExt, TryFutureExt}; use crate::{ artifact_cache::ArtifactCache, @@ -77,6 +77,7 @@ struct AssetHubSnapshotImpl { struct AssetHubImpl { ctx: Arc, + local: Rc>, } fn build_artifact_message>( @@ -532,22 +533,25 @@ impl AssetHubImpl { let tx = self.ctx.hub.register_listener(tx); - tokio::task::spawn_local(async move { - while rx.recv().await.is_ok() { - let mut request = listener.update_request(); - let snapshot = AssetHubSnapshotImpl::new(ctx.clone()).await; - let latest_change = ctx - .hub - .get_latest_asset_change(snapshot.txn.txn()) - .expect("failed to get latest change"); - request.get().set_latest_change(latest_change); - request.get().set_snapshot(capnp_rpc::new_client(snapshot)); - if request.send().promise.await.is_err() { - ctx.hub.drop_listener(tx); - break; + self.local + .spawn(async move { + while rx.recv().await.is_ok() { + let mut request = listener.update_request(); + let snapshot = AssetHubSnapshotImpl::new(ctx.clone()).await; + let latest_change = ctx + .hub + .get_latest_asset_change(snapshot.txn.txn()) + .expect("failed to get latest change"); + request.get().set_latest_change(latest_change); + request.get().set_snapshot(capnp_rpc::new_client(snapshot)); + if request.send().promise.await.is_err() { + ctx.hub.drop_listener(tx); + break; + } } - } - }); + }) + .detach(); + Ok(()) } @@ -583,29 +587,36 @@ impl AssetHubService { pub async fn run(&self, addr: std::net::SocketAddr) { let result: std::result::Result<(), Box> = async { - let listener = tokio::net::TcpListener::bind(&addr).await?; + let listener = async_net::TcpListener::bind(&addr).await?; loop { let (stream, _) = listener.accept().await?; - log::info!("tokio::net::TcpListener accepted"); - stream.set_nodelay(true).unwrap(); - use tokio_util::compat::*; - let (reader, writer) = stream.compat().split(); - - let service_impl = AssetHubImpl { - ctx: self.ctx.clone(), - }; - let hub_impl: asset_hub::Client = capnp_rpc::new_client(service_impl); - - let network = twoparty::VatNetwork::new( - reader, - writer, - rpc_twoparty_capnp::Side::Server, - Default::default(), - ); - - let rpc_system = RpcSystem::new(Box::new(network), Some(hub_impl.client)); - tokio::task::spawn_local(rpc_system.map_err(|_| ())); + let ctx = self.ctx.clone(); + + std::thread::spawn(|| { + log::info!("async_net::TcpListener accepted"); + + stream.set_nodelay(true).unwrap(); + let (reader, writer) = stream.split(); + + let local = Rc::new(async_executor::LocalExecutor::new()); + let service_impl = AssetHubImpl { + ctx, + local: local.clone(), + }; + + let hub_impl: asset_hub::Client = capnp_rpc::new_client(service_impl); + + let network = twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Server, + Default::default(), + ); + + let rpc_system = RpcSystem::new(Box::new(network), Some(hub_impl.client)); + async_io::block_on(local.run(rpc_system.map_err(|_| ()))).unwrap(); + }); } } .await; diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs index 898c6ea9..c5f63b99 100644 --- a/daemon/src/daemon.rs +++ b/daemon/src/daemon.rs @@ -10,10 +10,12 @@ use std::{ use asset_hub::AssetHub; use asset_hub_service::AssetHubService; +use distill_core::distill_signal; use distill_importer::{BoxedImporter, ImporterContext}; use distill_schema::data; use file_asset_source::FileAssetSource; -use tokio::sync::oneshot::{self, Receiver, Sender}; +use futures_util::future::FutureExt; +use std::rc::Rc; use crate::{ artifact_cache::ArtifactCache, asset_hub, asset_hub_service, capnp_db::Environment, @@ -165,23 +167,22 @@ impl AssetDaemon { self } - pub fn run(self) -> (JoinHandle<()>, Sender) { - let (tx, rx) = oneshot::channel(); + pub fn run(self) -> (JoinHandle<()>, distill_signal::Sender) { + let (tx, rx) = distill_signal::oneshot(); let handle = thread::spawn(|| { - let rpc_runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let local = tokio::task::LocalSet::new(); - - rpc_runtime.block_on(local.run_until(self.run_rpc_runtime(rx))) + let local = Rc::new(async_executor::LocalExecutor::new()); + async_io::block_on(local.run(self.run_rpc_runtime(&local, rx))) }); (handle, tx) } - async fn run_rpc_runtime(self, mut rx: Receiver) { + async fn run_rpc_runtime( + self, + local: &async_executor::LocalExecutor<'_>, + rx: distill_signal::Receiver, + ) { let cache_dir = self.db_dir.join("cache"); let _ = fs::create_dir(&self.db_dir); let _ = fs::create_dir(&cache_dir); @@ -244,18 +245,22 @@ impl AssetDaemon { let shutdown_tracker = tracker.clone(); - let mut service_handle = tokio::task::spawn_local(async move { service.run(addr).await }); - let mut tracker_handle = tokio::task::spawn_local(async move { tracker.run().await }); - let mut asset_source_handle = - tokio::task::spawn_local(async move { asset_source.run().await }); + let service_handle = local.spawn(async move { service.run(addr).await }).fuse(); + + let tracker_handle = local.spawn(async move { tracker.run().await }).fuse(); + let asset_source_handle = local.spawn(async move { asset_source.run().await }).fuse(); + + let rx_fuse = rx.fuse(); + + futures_util::pin_mut!(service_handle, tracker_handle, asset_source_handle, rx_fuse); log::info!("Starting Daemon Loop"); loop { - tokio::select! { - done = &mut service_handle => done.expect("ServiceHandle panicked"), - done = &mut tracker_handle => done.expect("FileTracker panicked"), - done = &mut asset_source_handle => done.expect("AssetSource panicked"), - done = &mut rx => match done { + futures_util::select! { + _done = &mut service_handle => panic!("ServiceHandle panicked"), + _done = &mut tracker_handle => panic!("FileTracker panicked"), + _done = &mut asset_source_handle => panic!("AssetSource panicked"), + done = &mut rx_fuse => match done { Ok(_) => { log::warn!("Shutting Down!"); shutdown_tracker.stop().await; @@ -267,7 +272,7 @@ impl AssetDaemon { } Err(_) => continue, } - } + }; } } } diff --git a/daemon/src/file_asset_source.rs b/daemon/src/file_asset_source.rs index 6573c51c..8edb94c6 100644 --- a/daemon/src/file_asset_source.rs +++ b/daemon/src/file_asset_source.rs @@ -19,7 +19,8 @@ use distill_schema::{ data::{self, path_refs, source_metadata}, parse_db_metadata, }; -use futures::{channel::mpsc::unbounded, lock::Mutex, stream::StreamExt}; +use futures_channel::mpsc::unbounded; +use futures_util::{lock::Mutex, stream::StreamExt}; use log::{debug, error, info}; #[cfg(feature = "rayon")] use rayon::prelude::*; @@ -45,6 +46,7 @@ pub(crate) struct FileAssetSource { tables: FileAssetSourceTables, importers: Arc, importer_contexts: Arc>>, + runtime: bevy_tasks::IoTaskPool, } struct FileAssetSourceTables { @@ -176,6 +178,7 @@ impl FileAssetSource { reverse_path_refs: db .create_db(Some("reverse_path_refs"), lmdb::DatabaseFlags::default())?, }, + runtime: bevy_tasks::IoTaskPool(bevy_tasks::TaskPoolBuilder::default().build()), importers: importers.clone(), importer_contexts, }) @@ -1207,140 +1210,150 @@ impl FileAssetSource { let metadata_changes = Mutex::new(HashMap::new()); let metadata_changes_ref = &metadata_changes; - // safety: mem::forget is not used on the scope. - let mut import_scope = unsafe { crate::scope::Scope::create() }; - - for p in hashed_files { - let processed_pair = p.clone(); - import_scope.spawn(async move { - let read_txn = self - .db - .ro_txn() - .await - .expect("failed to open RO transaction"); - let cache = DBSourceMetadataCache { - txn: &read_txn, - file_asset_source: &self, - _marker: std::marker::PhantomData, - }; - let result = source_pair_import::import_pair( - &cache, - &self.importers, - &self.importer_contexts, - &processed_pair, - &mut Vec::new(), - ) - .await; - - let result = match result { - Err(e) => return (processed_pair, Err(e)), - Ok(result) => result, - }; - - if let Some((import, import_output)) = result { - let metadata = if let Some(mut import_output) = import_output { - // TODO store reported errors and warnings in metadata - if let Some(import_op) = import_output.import_op { - for error in &import_op.errors { - log::error!("Import errors {:?}: {:?}", p.source, error); - } - for warning in &import_op.warnings { - log::warn!("Import warning {:?}: {:?}", p.source, warning); - } - } - // put import artifact in cache if it doesn't have unresolved refs - if !import_output.assets.is_empty() { - let mut txn = self - .artifact_cache - .rw_txn() - .await - .expect("failed to get cache txn"); - - for asset in import_output.assets.iter_mut() { - if asset.is_fully_resolved() { - if let Some(serialized_asset) = asset.serialized_asset.as_mut() - { - serialized_asset.metadata.id = - ArtifactId(utils::calc_import_artifact_hash( - &asset.metadata.id, - import.import_hash().unwrap(), - serialized_asset - .metadata - .load_deps - .iter() - .chain( - serialized_asset.metadata.build_deps.iter(), - ) - .map(|dep| dep.expect_uuid()), - )); - log::trace!( - "caching asset {:?} from file {:?} with hash {:?}", - asset.metadata.id, - p.source, - serialized_asset.metadata.id - ); - self.artifact_cache.insert(&mut txn, serialized_asset); - } else { - log::trace!("asset {:?} from file {:?} did not return serialized asset: cannot cache", asset.metadata.id, p.source ); + self.runtime.scope(|scope| { + let local = async_executor::LocalExecutor::new(); + async_io::block_on(local.run(async { + let (sender, mut receiver) = unbounded(); + + for p in hashed_files { + let processed_pair = p.clone(); + let sender = sender.clone(); + + scope.spawn(async move { + let read_txn = self + .db + .ro_txn() + .await + .expect("failed to open RO transaction"); + + let cache = DBSourceMetadataCache { + txn: &read_txn, + file_asset_source: &self, + _marker: std::marker::PhantomData, + }; + + let result = source_pair_import::import_pair( + &cache, + &self.importers, + &self.importer_contexts, + &processed_pair, + &mut Vec::new(), + ).await; + + let result = match result { + Err(e) => { + sender.unbounded_send((processed_pair.clone(), Err(e))).expect("failed to send"); + return; + }, + Ok(result) => result, + }; + + if let Some((import, import_output)) = result { + let metadata = if let Some(mut import_output) = import_output { + // TODO store reported errors and warnings in metadata + if let Some(import_op) = import_output.import_op { + for error in &import_op.errors { + log::error!("Import errors {:?}: {:?}", p.source, error); + } + for warning in &import_op.warnings { + log::warn!("Import warning {:?}: {:?}", p.source, warning); } - } else { - log::trace!("asset {:?} from file {:?} not fully resolved: cannot cache", asset.metadata.id, p.source ); } - } - txn.commit().expect("failed to commit cache txn"); - } + // put import artifact in cache if it doesn't have unresolved refs + if !import_output.assets.is_empty() { + let mut txn = self + .artifact_cache + .rw_txn() + .await + .expect("failed to get cache txn"); + + for asset in import_output.assets.iter_mut() { + if asset.is_fully_resolved() { + if let Some(serialized_asset) = asset.serialized_asset.as_mut() + { + serialized_asset.metadata.id = + ArtifactId(utils::calc_import_artifact_hash( + &asset.metadata.id, + import.import_hash().unwrap(), + serialized_asset + .metadata + .load_deps + .iter() + .chain( + serialized_asset.metadata.build_deps.iter(), + ) + .map(|dep| dep.expect_uuid()), + )); + log::trace!( + "caching asset {:?} from file {:?} with hash {:?}", + asset.metadata.id, + p.source, + serialized_asset.metadata.id + ); + self.artifact_cache.insert(&mut txn, serialized_asset); + } else { + log::trace!("asset {:?} from file {:?} did not return serialized asset: cannot cache", asset.metadata.id, p.source); + } + } else { + log::trace!("asset {:?} from file {:?} not fully resolved: cannot cache", asset.metadata.id, p.source); + } + } + txn.commit().expect("failed to commit cache txn"); + } - Some(PairImportResultMetadata { - import_state: import, - assets: import_output - .assets - .into_iter() - .map(|a| AssetImportResultMetadata { - metadata: a.metadata, - unresolved_load_refs: a.unresolved_load_refs, - unresolved_build_refs: a.unresolved_build_refs, + Some(PairImportResultMetadata { + import_state: import, + assets: import_output + .assets + .into_iter() + .map(|a| AssetImportResultMetadata { + metadata: a.metadata, + unresolved_load_refs: a.unresolved_load_refs, + unresolved_build_refs: a.unresolved_build_refs, + }) + .collect(), }) - .collect(), - }) - } else { - None - }; + } else { + None + }; + + let path = &processed_pair + .source + .as_ref() + .or_else(|| processed_pair.meta.as_ref()) + .expect("a successful import must have a source or meta FileState") + .path; + + metadata_changes_ref + .lock() + .await + .insert(path.clone(), metadata); + }; - let path = &processed_pair - .source - .as_ref() - .or_else(|| processed_pair.meta.as_ref()) - .expect("a successful import must have a source or meta FileState") - .path; - - metadata_changes_ref - .lock() - .await - .insert(path.clone(), metadata); - }; + sender.unbounded_send((processed_pair.clone(), Ok(()))).expect("failed to send"); + }); + } - (processed_pair, Ok(())) - }); - } + std::mem::drop(sender); - while let Some((pair, maybe_result)) = import_scope.next().await { - match maybe_result { - // Successful import - Ok(()) => { - let mut txn = txn.lock().await; - self.ack_dirty_file_states(&mut txn, &pair); - } - Err(e) => { - error!( - "Error processing pair at {:?}: {}", - pair.source.as_ref().map(|s| &s.path), - e - ) + while let Some((pair, maybe_result)) = receiver.next().await { + match maybe_result { + // Successful import + Ok(()) => { + let mut txn = txn.lock().await; + self.ack_dirty_file_states(&mut txn, &pair); + } + Err(e) => { + error!( + "Error processing pair at {:?}: {}", + pair.source.as_ref().map(|s| &s.path), + e + ) + } + } } - } - } - - drop(import_scope); + })) + }); let mut change_batch = asset_hub::ChangeBatch::new(); let txn = txn.into_inner(); diff --git a/daemon/src/file_tracker.rs b/daemon/src/file_tracker.rs index a553f186..ce50ef7d 100644 --- a/daemon/src/file_tracker.rs +++ b/daemon/src/file_tracker.rs @@ -17,17 +17,14 @@ use std::{ use distill_core::utils::{self, canonicalize_path}; use distill_schema::data::{self, dirty_file_info, rename_file_event, source_file_info, FileType}; use event_listener::Event; -use futures::{ - channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, +use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; +use futures_util::{ + lock::Mutex, stream::StreamExt, FutureExt, }; use lmdb::Cursor; use log::{debug, info}; -use tokio::{ - sync::{mpsc, Mutex}, - time, -}; use crate::{ capnp_db::{ @@ -60,6 +57,7 @@ pub struct FileTracker { stopping_event: event_listener::Event, watch_dirs: RwLock>, } + #[derive(Clone)] pub struct FileState { pub path: PathBuf, @@ -518,7 +516,7 @@ impl FileTracker { } pub async fn add_dirty_file(&self, txn: &mut RwTransaction<'_>, path: &Path) -> Result<()> { - let metadata = match tokio::fs::metadata(path).await { + let metadata = match async_fs::metadata(path).await { Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => None, Err(e) => return Err(Error::IO(e)), Ok(metadata) => Some(watcher::file_metadata(&metadata)), @@ -669,7 +667,7 @@ impl FileTracker { if self.is_running() { self.stopping_event.notify(std::usize::MAX); self.listener_rx.lock().await; - assert_eq!(false, self.is_running.load(Ordering::Acquire)); + assert!(!self.is_running.load(Ordering::Acquire)); } } @@ -686,7 +684,7 @@ impl FileTracker { return; } - let (watcher_tx, mut watcher_rx) = mpsc::unbounded_channel(); + let (watcher_tx, watcher_rx) = unbounded(); let to_watch: Vec = self.get_watch_dirs(); // NOTE(happens): If we can't watch the dir, we want to abort @@ -700,24 +698,32 @@ impl FileTracker { let mut listeners = ListenersList::new(); let mut scan_stack = Vec::new(); - let mut stopping = self.stopping_event.listen(); + let stopping = self.stopping_event.listen().fuse(); let mut listener_rx_guard = self.listener_rx.lock().await; - let listener_rx = listener_rx_guard.get_mut(); + + let listener_rx = listener_rx_guard.get_mut().fuse(); + let watcher_rx = watcher_rx.fuse(); + + futures_util::pin_mut!(watcher_rx, listener_rx, stopping); let mut dirty = false; #[allow(unused_mut)] loop { - let mut delay = time::sleep(Duration::from_millis(40)); + let delay = futures_util::future::FutureExt::fuse(async_io::Timer::after(Duration::from_millis(40))); - tokio::select! { + futures_util::pin_mut!(delay); + + futures_util::select! { new_listener = listener_rx.next() => listeners.register(new_listener), - _ = delay, if dirty => { - listeners.send_event(FileTrackerEvent::Update); - dirty = false; + _ = delay => { + if dirty { + listeners.send_event(FileTrackerEvent::Update); + dirty = false; + } }, - mut maybe_file_event = watcher_rx.recv() => { + mut maybe_file_event = watcher_rx.next() => { if maybe_file_event.is_none() { debug!("FileTracker: stopping due to exhausted watcher"); break; @@ -733,7 +739,7 @@ impl FileTracker { Err(err) => panic!("Error while handling file event: {}", err), } - maybe_file_event = watcher_rx.recv().now_or_never().flatten(); + maybe_file_event = watcher_rx.next().now_or_never().flatten(); } if txn.dirty { @@ -766,12 +772,11 @@ pub mod tests { time::Duration, }; - use tokio::time::timeout; - use super::*; use crate::{ capnp_db::Environment, file_tracker::{FileTracker, FileTrackerEvent}, + timeout::timeout, }; pub async fn with_tracker(f: F) @@ -798,18 +803,20 @@ pub mod tests { tracker.register_listener(tx); let tracker_clone = tracker.clone(); - let handle = tokio::task::spawn(async move { tracker_clone.run().await }); + + let runtime = async_executor::Executor::new(); + let handle = runtime.spawn(async move { tracker_clone.run().await }); expect_event(&mut rx).await; f(tracker.clone(), rx, asset_dir.into_path()).await; tracker.stop().await; - handle.await.unwrap(); + handle.await; } async fn expect_no_event(rx: &mut UnboundedReceiver) { match timeout(Duration::from_millis(1000), rx.next()).await { - Err(_) => return, + Err(_) => {} Ok(evt) => panic!("Received unexpected event {:?}", evt), } } @@ -817,7 +824,7 @@ pub mod tests { async fn expect_event(rx: &mut UnboundedReceiver) -> FileTrackerEvent { match timeout(Duration::from_millis(10000), rx.next()).await { Err(_) => panic!("Timed out waiting for file event"), - Ok(evt) => return evt.unwrap(), + Ok(evt) => evt.unwrap(), } } @@ -842,12 +849,12 @@ pub mod tests { pub async fn add_test_dir(asset_dir: &Path, name: &str) -> PathBuf { let path = PathBuf::from(asset_dir).join(name); - tokio::fs::create_dir(&path).await.expect("create dir"); + async_fs::create_dir(&path).await.expect("create dir"); path } pub async fn add_test_file(asset_dir: &Path, name: &str) { - tokio::fs::copy( + async_fs::copy( PathBuf::from("tests/file_tracker/").join(name), asset_dir.join(name), ) @@ -928,7 +935,7 @@ pub mod tests { } } - #[tokio::test] + #[futures_test::test] async fn test_create_file() { with_tracker(|t, mut rx, asset_dir| async move { add_test_file(&asset_dir, "test.txt").await; @@ -940,7 +947,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_modify_file() { with_tracker(|t, mut rx, asset_dir| async move { add_test_file(&asset_dir, "test.txt").await; @@ -949,7 +956,7 @@ pub mod tests { expect_dirty_file_state(&t, &asset_dir, "test.txt").await; clear_dirty_file_state(&t).await; - tokio::fs::File::create(asset_dir.join("test.txt")) + async_fs::File::create(asset_dir.join("test.txt")) .await .expect("truncate test file"); @@ -961,7 +968,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_delete_file() { with_tracker(|t, mut rx, asset_dir| async move { add_test_file(&asset_dir, "test.txt").await; @@ -970,7 +977,7 @@ pub mod tests { expect_dirty_file_state(&t, &asset_dir, "test.txt").await; clear_dirty_file_state(&t).await; - tokio::fs::remove_file(asset_dir.join("test.txt")) + async_fs::remove_file(asset_dir.join("test.txt")) .await .expect("test file could not be deleted"); @@ -982,7 +989,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_create_dir() { with_tracker(|t, mut rx, asset_dir| async move { add_test_dir(&asset_dir, "testdir").await; @@ -994,7 +1001,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_create_file_in_dir() { with_tracker(|t, mut rx, asset_dir| async move { let dir = add_test_dir(&asset_dir, "testdir").await; @@ -1013,7 +1020,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_create_emacs_lockfile() { with_tracker(|t, mut rx, asset_dir| async move { add_symlink_file( @@ -1030,7 +1037,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_create_symlink_dir() { with_tracker(|t, mut rx, asset_dir| { async move { @@ -1054,7 +1061,7 @@ pub mod tests { .await; } - #[tokio::test] + #[futures_test::test] async fn test_delete_symlink_dir() { with_tracker(|t, mut rx, asset_dir| async move { let watch_dir = tempfile::tempdir().unwrap(); @@ -1067,11 +1074,11 @@ pub mod tests { assert!(t.get_watch_dirs() == vec![asset_dir.clone(), watch_dir.path().to_path_buf()]); #[cfg(target_family = "windows")] - tokio::fs::remove_dir(asset_dir.join("dir_symlink")) + async_fs::remove_dir(asset_dir.join("dir_symlink")) .await .expect("test file could not be deleted"); #[cfg(target_family = "unix")] - tokio::fs::remove_file(asset_dir.join("dir_symlink")) + async_fs::remove_file(asset_dir.join("dir_symlink")) .await .expect("test file could not be deleted"); diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 9bc86790..cc364d26 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,11 +10,14 @@ mod daemon; mod error; mod file_asset_source; mod file_tracker; -mod scope; mod serialized_asset; mod source_pair_import; mod watcher; +// This module is only used from test code +#[cfg(test)] +mod timeout; + pub use crate::{ daemon::{default_importer_contexts, default_importers, AssetDaemon, ImporterMap}, error::{Error, Result}, @@ -74,8 +77,6 @@ pub fn init_logging() -> Result<()> { }) .chain(std::io::stdout()) .level(log_level) - .level_for("mio", log::LevelFilter::Info) - .level_for("tokio_core", log::LevelFilter::Info) // .chain(fern::log_file("output.log")?) .apply()?; Ok(()) diff --git a/daemon/src/scope.rs b/daemon/src/scope.rs deleted file mode 100644 index 91551348..00000000 --- a/daemon/src/scope.rs +++ /dev/null @@ -1,210 +0,0 @@ -#![allow(clippy::needless_lifetimes)] -use std::{ - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{ - future::{BoxFuture, Future, FutureExt}, - stream::{FuturesUnordered, Stream}, -}; -use pin_project::{pin_project, pinned_drop}; -use tokio::task::JoinHandle; - -/// A scope to allow controlled spawning of non 'static -/// futures. Futures can be spawned using `spawn` or -/// `spawn_cancellable` methods. -/// -/// # Safety -/// -/// This type uses `Drop` implementation to guarantee -/// safety. It is not safe to forget this object unless it -/// is driven to completion. -#[pin_project(PinnedDrop)] -pub struct Scope<'a, T> { - done: bool, - len: usize, - remaining: usize, - #[pin] - futs: FuturesUnordered>, - - // Future proof against variance changes - _marker: PhantomData<&'a mut &'a ()>, -} - -impl<'a, T: Send + 'static> Scope<'a, T> { - /// Create a Scope object. - /// - /// This function is unsafe as `futs` may hold futures - /// which have to be manually driven to completion. - pub unsafe fn create() -> Self { - Scope { - done: false, - len: 0, - remaining: 0, - futs: FuturesUnordered::new(), - _marker: PhantomData, - } - } - - /// Spawn a future with `async_std::task::spawn`. The - /// future is expected to be driven to completion before - /// 'a expires. - pub fn spawn + Send + 'a>(&mut self, f: F) { - let handle = - tokio::spawn(unsafe { std::mem::transmute::<_, BoxFuture<'static, T>>(f.boxed()) }); - self.futs.push(handle); - self.len += 1; - self.remaining += 1; - } -} - -impl<'a, T> Scope<'a, T> { - /// Total number of futures spawned in this scope. - #[inline] - #[allow(dead_code)] - pub fn len(&self) -> usize { - self.len - } - - /// Number of futures remaining in this scope. - #[inline] - #[allow(dead_code)] - pub fn remaining(&self) -> usize { - self.remaining - } - - /// A slighly optimized `collect` on the stream. Also - /// useful when we can not move out of self. - pub async fn collect(&mut self) -> Vec { - let mut proc_outputs = Vec::with_capacity(self.remaining); - - use futures::stream::StreamExt; - while let Some(item) = self.next().await { - proc_outputs.push(item); - } - - proc_outputs - } -} - -impl<'a, T> Stream for Scope<'a, T> { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let poll = this.futs.poll_next(cx); - if let Poll::Ready(None) = poll { - *this.done = true; - } else if poll.is_ready() { - *this.remaining -= 1; - } - poll.map(|t| t.map(|t| t.expect("task not driven to completion"))) - } - - fn size_hint(&self) -> (usize, Option) { - (self.remaining, Some(self.remaining)) - } -} - -#[pinned_drop] -impl PinnedDrop for Scope<'_, T> { - fn drop(mut self: Pin<&mut Self>) { - if !self.done { - futures::executor::block_on(async { - // self.cancel().await; - self.collect().await; - }); - } - } -} - -/// Creates a `Scope` to spawn non-'static futures. The -/// function is called with a block which takes an `&mut -/// Scope`. The `spawn` method on this arg. can be used to -/// spawn "local" futures. -/// -/// # Returns -/// -/// The function returns the created `Scope`, and the return -/// value of the block passed to it. The returned stream and -/// is expected to be driven completely before being -/// forgotten. Dropping this stream causes the stream to be -/// driven _while blocking the current thread_. The values -/// returned from the stream are the output of the futures -/// spawned. -/// -/// # Safety -/// -/// The returned stream is expected to be run to completion -/// before being forgotten. Dropping it is okay, but blocks -/// the current thread until all spawned futures complete. -pub unsafe fn scope<'a, T: Send + 'static, R, F: FnOnce(&mut Scope<'a, T>) -> R>( - f: F, -) -> (Scope<'a, T>, R) { - let mut scope = Scope::create(); - let op = f(&mut scope); - (scope, op) -} - -/// A function that creates a scope and immediately awaits, -/// _blocking the current thread_ for spawned futures to -/// complete. The outputs of the futures are collected as a -/// `Vec` and returned along with the output of the block. -/// -/// # Safety -/// -/// This function is safe to the best of our understanding -/// as it blocks the current thread until the stream is -/// driven to completion, implying that all the spawned -/// futures have completed too. However, care must be taken -/// to ensure a recursive usage of this function doesn't -/// lead to deadlocks. -/// -/// When scope is used recursively, you may also use the -/// unsafe `scope_and_*` functions as long as this function -/// is used at the top level. In this case, either the -/// recursively spawned should have the same lifetime as the -/// top-level scope, or there should not be any spurious -/// future cancellations within the top level scope. -#[allow(dead_code)] -pub fn scope_and_block<'a, T: Send + 'static, R, F: FnOnce(&mut Scope<'a, T>) -> R>( - f: F, -) -> (R, Vec) { - let (mut stream, block_output) = unsafe { scope(f) }; - let proc_outputs = futures::executor::block_on(stream.collect()); - (block_output, proc_outputs) -} - -/// An asynchronous function that creates a scope and -/// immediately awaits the stream. The outputs of the -/// futures are collected as a `Vec` and returned along with -/// the output of the block. -/// -/// # Safety -/// -/// This function is _not completely safe_: please see -/// `cancellation_soundness` in [tests.rs][tests-src] for a -/// test-case that suggests how this can lead to invalid -/// memory access if not dealt with care. -/// -/// The caller must ensure that the lifetime 'a is valid -/// until the returned future is fully driven. Dropping the -/// future is okay, but blocks the current thread until all -/// spawned futures complete. -/// -/// [tests-src]: https://github.com/rmanoka/async-scoped/blob/master/src/tests.rs -#[allow(dead_code)] -pub async unsafe fn scope_and_collect< - 'a, - T: Send + 'static, - R, - F: FnOnce(&mut Scope<'a, T>) -> R, ->( - f: F, -) -> (R, Vec) { - let (mut stream, block_output) = scope(f); - let proc_outputs = stream.collect().await; - (block_output, proc_outputs) -} diff --git a/daemon/src/source_pair_import.rs b/daemon/src/source_pair_import.rs index 1ef513f9..d823699e 100644 --- a/daemon/src/source_pair_import.rs +++ b/daemon/src/source_pair_import.rs @@ -14,10 +14,10 @@ use distill_importer::{ SourceMetadata as ImporterSourceMetadata, SOURCEMETADATA_VERSION, }; use distill_schema::data; -use futures::future::{BoxFuture, Future}; +use futures_core::future::BoxFuture; +use std::future::Future; use log::{debug, error}; use serde::{Deserialize, Serialize}; -use tokio::{fs::File, io::AsyncReadExt}; use crate::{ daemon::ImporterMap, @@ -26,6 +26,7 @@ use crate::{ file_tracker::FileState, watcher::file_metadata, }; +use futures_util::AsyncReadExt; pub type SourceMetadata = ImporterSourceMetadata, Box>; @@ -72,15 +73,8 @@ pub(crate) struct AssetImportResult { impl AssetImportResult { pub(crate) fn is_fully_resolved(&self) -> bool { - self.unresolved_load_refs - .iter() - .find(|r| r.is_uuid()) - .is_none() - && self - .unresolved_build_refs - .iter() - .find(|r| r.is_uuid()) - .is_none() + !self.unresolved_load_refs.iter().any(|r| r.is_uuid()) + && !self.unresolved_build_refs.iter().any(|r| r.is_uuid()) } } @@ -273,7 +267,7 @@ impl<'a> SourcePairImport<'a> { .importer .expect("cannot read metadata without an importer"); let meta = utils::to_meta_path(&self.source); - let mut f = File::open(&meta).await?; + let mut f = async_fs::File::open(&meta).await?; scratch_buf.clear(); f.read_to_end(scratch_buf).await?; let mut deserializer = ron::de::Deserializer::from_bytes(&scratch_buf)?; @@ -534,13 +528,12 @@ impl<'a> SourcePairImport<'a> { let mut ctx = Self::get_importer_context_set(self.importer_contexts); let source = &self.source; - use tokio_util::compat::*; let exported = ctx .scope(async move { - let f = File::open(source).await?; + let mut f = async_fs::File::open(source).await?; importer .export_boxed( - &mut f.compat(), + &mut f, metadata.importer_options, metadata.importer_state, assets @@ -592,6 +585,7 @@ impl<'a> SourcePairImport<'a> { let import_op_ref = &mut import_op; let imported = ctx .scope(async move { + // TODO(dvd): Can this be replaced now that tokio is gone? //This is broken on tokio 0.2.14 and later (concurrent file loads endlessly yield to // each other. // let mut f = File::open(source).await?; @@ -603,13 +597,12 @@ impl<'a> SourcePairImport<'a> { let mut f = std::fs::File::open(source)?; let mut contents = vec![]; f.read_to_end(&mut contents)?; - let cursor = std::io::Cursor::new(contents); + let mut cursor = futures_util::io::Cursor::new(contents); - use tokio_util::compat::*; importer .import_boxed( import_op_ref, - &mut cursor.compat(), + &mut cursor, metadata.importer_options, metadata.importer_state, ) diff --git a/daemon/src/timeout.rs b/daemon/src/timeout.rs new file mode 100644 index 00000000..cb5b3644 --- /dev/null +++ b/daemon/src/timeout.rs @@ -0,0 +1,98 @@ +//NOTE: This is vendored from async_std under Apache 2.0/MIT license. (see EDIT comments below +// for changes) + +use std::error::Error; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use pin_project_lite::pin_project; + +//EDIT use std directly +//use crate::task::{Context, Poll}; +use std::task::{Context, Poll}; + +//EDIT: Use async_io directly +//use crate::utils::{timer_after, Timer}; +use async_io::Timer; + +/// Awaits a future or times out after a duration of time. +/// +/// If you want to await an I/O future consider using +/// [`io::timeout`](../io/fn.timeout.html) instead. +/// +/// # Examples +/// +/// ``` +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use std::time::Duration; +/// +/// use async_std::future; +/// +/// let never = future::pending::<()>(); +/// let dur = Duration::from_millis(5); +/// assert!(future::timeout(dur, never).await.is_err()); +/// # +/// # Ok(()) }) } +/// ``` +// EDIT: Allow dead code (macOS does not run the tests that require this) +#[allow(dead_code)] +pub async fn timeout(dur: Duration, f: F) -> Result +where + F: Future, +{ + TimeoutFuture::new(f, dur).await +} + +pin_project! { + /// A future that times out after a duration of time. + pub struct TimeoutFuture { + #[pin] + future: F, + #[pin] + delay: Timer, + } +} + +impl TimeoutFuture { + #[allow(dead_code)] + pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture { + TimeoutFuture { + future, + //EDIT: Call timer directly + //delay: timer_after(dur), + delay: Timer::after(dur), + } + } +} + +impl Future for TimeoutFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.future.poll(cx) { + Poll::Ready(v) => Poll::Ready(Ok(v)), + Poll::Pending => match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(Err(TimeoutError { _private: () })), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// An error returned when a future times out. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutError { + _private: (), +} + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "future has timed out".fmt(f) + } +} diff --git a/daemon/src/watcher.rs b/daemon/src/watcher.rs index dea56eaa..79d17cd9 100644 --- a/daemon/src/watcher.rs +++ b/daemon/src/watcher.rs @@ -8,9 +8,9 @@ use std::{ use distill_core::utils::canonicalize_path; use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher}; -use tokio::sync::mpsc::UnboundedSender; use crate::error::{Error, Result}; +use futures_channel::mpsc::UnboundedSender; /// The purpose of DirWatcher is to provide enough information to /// determine which files may be candidates for going through the asset import process. @@ -106,11 +106,11 @@ impl DirWatcher { { let canonical_dir = canonicalize_path(dir); self.asset_tx - .send(FileEvent::ScanStart(canonical_dir.clone())) + .unbounded_send(FileEvent::ScanStart(canonical_dir.clone())) .map_err(|_| Error::SendError)?; let result = self.scan_directory_recurse(&canonical_dir, evt_create); self.asset_tx - .send(FileEvent::ScanEnd(canonical_dir, self.dirs.clone())) + .unbounded_send(FileEvent::ScanEnd(canonical_dir, self.dirs.clone())) .map_err(|_| Error::SendError)?; result } @@ -130,7 +130,9 @@ impl DirWatcher { Ok(entry) => { let evt = self.handle_notify_event(evt_create(entry.path()), true)?; if let Some(evt) = evt { - self.asset_tx.send(evt).map_err(|_| Error::SendError)?; + self.asset_tx + .unbounded_send(evt) + .map_err(|_| Error::SendError)?; } let metadata; match entry.metadata() { @@ -154,7 +156,7 @@ impl DirWatcher { for dir in &self.dirs.clone() { if let Err(err) = self.scan_directory(&dir, &|path| DebouncedEvent::Create(path)) { self.asset_tx - .send(FileEvent::FileError(err)) + .unbounded_send(FileEvent::FileError(err)) .expect("Failed to send file error event. Ironic..."); } } @@ -165,7 +167,7 @@ impl DirWatcher { Ok(maybe_event) => { if let Some(evt) = maybe_event { log::debug!("File event: {:?}", evt); - self.asset_tx.send(evt).unwrap(); + self.asset_tx.unbounded_send(evt).unwrap(); } } Err(err) => match err { @@ -175,7 +177,7 @@ impl DirWatcher { self.scan_directory(&dir, &|path| DebouncedEvent::Create(path)) { self.asset_tx - .send(FileEvent::FileError(err)) + .unbounded_send(FileEvent::FileError(err)) .expect("Failed to send file error event"); } } @@ -183,13 +185,13 @@ impl DirWatcher { Error::Exit => break, _ => self .asset_tx - .send(FileEvent::FileError(err)) + .unbounded_send(FileEvent::FileError(err)) .expect("Failed to send file error event"), }, }, Err(_) => { self.asset_tx - .send(FileEvent::FileError(Error::RecvError)) + .unbounded_send(FileEvent::FileError(Error::RecvError)) .expect("Failed to send file error event"); return; @@ -253,7 +255,7 @@ impl DirWatcher { } self.symlink_map.remove(src); self.asset_tx - .send(FileEvent::Unwatch(to_unwatch)) + .unbounded_send(FileEvent::Unwatch(to_unwatch)) .map_err(|_| Error::SendError)?; } Err(err) => { @@ -274,7 +276,7 @@ impl DirWatcher { } self.symlink_map.insert(dst.clone(), link_path.clone()); self.asset_tx - .send(FileEvent::Watch(link_path)) + .unbounded_send(FileEvent::Watch(link_path)) .map_err(|_| Error::SendError)?; } Err(Error::Notify(notify::Error::Generic(text))) diff --git a/deny.toml b/deny.toml index 01f6aded..0ff5bc27 100644 --- a/deny.toml +++ b/deny.toml @@ -21,7 +21,7 @@ skip = [ { name = "mio", version = "0.6.23" }, # upgrade notify { name = "miow", version = "0.2.2" }, # upgrade notify { name = "cfg-if", version = "0.1.10" }, - { name = "winapi", version = "0.2.8" }, # upgrade tokio and notify + { name = "winapi", version = "0.2.8" }, # upgrade notify { name = "redox_syscall", version = "0.1.57" }, # old version in notify ] diff --git a/examples/handle_integration/Cargo.toml b/examples/handle_integration/Cargo.toml index cac052f3..25fb7412 100644 --- a/examples/handle_integration/Cargo.toml +++ b/examples/handle_integration/Cargo.toml @@ -10,7 +10,6 @@ publish = false [dependencies] distill = { version = "=0.0.3", path = "../..", features = ["serde_importers", "pretty_log"] } futures-executor = { version = "0.3", default-features = false } -tokio = { version = "1.2", features = ["io-util"] } image2 = { version = "0.11", features = ["ser"] } log = { version = "0.4", features = ["serde"] } diff --git a/importer/Cargo.toml b/importer/Cargo.toml index cc8e0a01..7fbbb084 100644 --- a/importer/Cargo.toml +++ b/importer/Cargo.toml @@ -15,7 +15,9 @@ serde = "1" erased-serde = "0.3" ron = { version = "0.6.4", optional = true } typetag = { version = "0.1", optional = true } -futures = "0.3" +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-io = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false } log = { version = "0.4", features = ["serde"] } [features] diff --git a/importer/src/boxed_importer.rs b/importer/src/boxed_importer.rs index 05f66575..d9f25c55 100644 --- a/importer/src/boxed_importer.rs +++ b/importer/src/boxed_importer.rs @@ -1,6 +1,7 @@ use distill_core::TypeUuidDynamic; use erased_serde::Deserializer; -use futures::{future::BoxFuture, AsyncRead, AsyncWrite}; +use futures_core::future::BoxFuture; +use futures_io::{AsyncRead, AsyncWrite}; use serde::{Deserialize, Serialize}; use crate::{error::Result, AsyncImporter, ExportAsset, ImportOp, ImporterValue, SerdeObj}; diff --git a/importer/src/lib.rs b/importer/src/lib.rs index 06943a7c..b65f192f 100644 --- a/importer/src/lib.rs +++ b/importer/src/lib.rs @@ -14,7 +14,9 @@ pub use distill_core::{ use distill_core::{AssetRef, AssetUuid}; #[cfg(feature = "serde_importers")] pub use distill_serde_importable_derive::*; -use futures::{future::BoxFuture, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_core::future::BoxFuture; +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; pub use serde; use serde::Serialize; diff --git a/loader/Cargo.toml b/loader/Cargo.toml index 241fa280..4c758c2a 100644 --- a/loader/Cargo.toml +++ b/loader/Cargo.toml @@ -11,8 +11,6 @@ distill-core = { path = "../core", version = "=0.0.3", features = ["serde-1"] } distill-schema = { path = "../schema", version = "=0.0.3", optional = true } crossbeam-channel = "0.5.0" -tokio = { version = "1.2", features = ["sync", "rt", "rt-multi-thread"], optional = true } -tokio-util = { version = "0.6.1", features = ["compat"], optional = true } futures-util = { version = "0.3", default-features = false, features = ["io"], optional = true } futures-channel = { version = "0.3", default-features = false, features = ["alloc"] } futures-core = { version = "0.3", default-features = false, features = ["alloc"] } @@ -23,6 +21,11 @@ dashmap = "4.0.1" serde = { version = "1", features = ["derive"], optional = true } uuid = { version = "0.8.2", optional = true } thread_local = { version = "1.0", optional = true } +async-io = { version = "1.4.1", optional = true } +async-executor = { version = "1.4.1", optional = true } +async-net = { version = "1.6.0", optional = true } +bevy_tasks = { version = "0.5.0", optional = true } +pin-project-lite = "0.2.6" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] memmap = { version = "0.7", optional = true } @@ -35,23 +38,25 @@ instant = { version = "0.1", features = ["wasm-bindgen"] } default = [] packfile_io = [ "distill-schema", - "tokio", "capnp", "capnp-rpc", - "tokio-util", "futures-util", "memmap", "thread_local", + "async-io", + "async-executor", + "bevy_tasks", "invalidate_path" ] rpc_io = [ "distill-schema", - "tokio", "capnp", "capnp-rpc", "futures-util", "invalidate_path", - "tokio/net" + "async-io", + "async-executor", + "async-net" ] invalidate_path = ["distill-core/path_utils"] handle = ["serde", "uuid"] diff --git a/loader/src/handle.rs b/loader/src/handle.rs index dfd8328d..fb4b896f 100644 --- a/loader/src/handle.rs +++ b/loader/src/handle.rs @@ -263,7 +263,7 @@ impl AssetHandle for WeakHandle { } } -tokio::task_local! { +crate::task_local! { static LOADER: &'static dyn LoaderInfoProvider; static REFOP_SENDER: Sender; } @@ -403,7 +403,7 @@ impl<'a> distill_core::importer_context::ImporterContextHandle for DummySerdeCon panic!("end_serialize_asset when current_serde_asset is not set"); } current.current_serde_asset = None; - std::mem::replace(&mut current.current_serde_dependencies, HashSet::new()) + std::mem::take(&mut current.current_serde_dependencies) } } diff --git a/loader/src/io.rs b/loader/src/io.rs index 76dc2bd8..1f9f2a87 100644 --- a/loader/src/io.rs +++ b/loader/src/io.rs @@ -11,7 +11,6 @@ pub trait LoaderIO: Send + Sync { fn get_asset_candidates(&mut self, requests: Vec); fn get_artifacts(&mut self, requests: Vec); fn tick(&mut self, loader: &mut LoaderState); - fn with_runtime(&self, f: &mut dyn FnMut(&tokio::runtime::Runtime)); } /// A request for an asset artifact's data. diff --git a/loader/src/lib.rs b/loader/src/lib.rs index 1022dd01..d6b50558 100644 --- a/loader/src/lib.rs +++ b/loader/src/lib.rs @@ -16,6 +16,8 @@ pub mod rpc_io; /// [`AssetStorage`](crate::storage::AssetStorage) is implemented by engines to store loaded asset data. pub mod storage; +mod task_local; + pub use crossbeam_channel; pub use distill_core::{AssetRef, AssetTypeId, AssetUuid}; pub use loader::Loader; diff --git a/loader/src/loader.rs b/loader/src/loader.rs index 61537680..77dd4e76 100644 --- a/loader/src/loader.rs +++ b/loader/src/loader.rs @@ -14,7 +14,6 @@ use instant::Instant; use log::error; use crate::{ - handle::{RefOp, SerdeContext}, io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest}, storage::{ AssetLoadOp, AssetStorage, AtomicHandleAllocator, HandleAllocator, HandleOp, @@ -503,8 +502,8 @@ impl LoaderState { } entry.value_mut().versions = versions; + let time_in_state = last_state_change_instant.elapsed().as_secs_f32(); if state_change { - let time_in_state = last_state_change_instant.elapsed().as_secs_f32(); log::debug!( "{:?} {:?} => {:?} in {}s", key, @@ -515,7 +514,6 @@ impl LoaderState { entry.value_mut().last_state_change_instant = Instant::now(); } else { - let time_in_state = last_state_change_instant.elapsed().as_secs_f32(); log::trace!( "process_load_states Key: {:?} State: {:?} Time in state: {}", key, @@ -1022,15 +1020,6 @@ impl Loader { } } - pub fn with_serde_context(&self, tx: &Sender, mut f: impl FnMut() -> R) -> R { - let mut result = None; - self.io.with_runtime(&mut |runtime| { - result = - Some(runtime.block_on(SerdeContext::with(&self.data, tx.clone(), async { f() }))); - }); - result.unwrap() - } - /// Returns the load handle for the asset with the given UUID, if present. /// /// This will only return `Some(..)` if there has been a previous call to [`Loader::add_ref`]. diff --git a/loader/src/packfile_io.rs b/loader/src/packfile_io.rs index 6fb3734e..b8a2b93b 100644 --- a/loader/src/packfile_io.rs +++ b/loader/src/packfile_io.rs @@ -68,8 +68,11 @@ impl Drop for PackfileMessageReaderFile { } #[derive(PartialEq)] +#[allow(dead_code)] enum RuntimeType { + // Use a single-threaded runtime (compatible with WASM) CurrentThread, + // Use a typical runtime, which may use multiple threads MultiThread, } @@ -123,7 +126,8 @@ struct PackfileReaderInner { reader: Box, index_by_uuid: HashMap, assets_by_path: HashMap>, - runtime: tokio::runtime::Runtime, + runtime: bevy_tasks::IoTaskPool, + #[allow(dead_code)] runtime_type: RuntimeType, } pub struct PackfileReader(Arc); @@ -168,8 +172,14 @@ impl PackfileReader { let runtime_type = RuntimeType::MultiThread; let runtime = match runtime_type { - RuntimeType::CurrentThread => tokio::runtime::Builder::new_current_thread().build()?, - RuntimeType::MultiThread => tokio::runtime::Builder::new_multi_thread().build()?, + // The CurrentThread codepath has not been verified as necessary for WASM, this needs to + // be verified as necessary and correct before it is used + RuntimeType::CurrentThread => { + unimplemented!("packfile_io needs to be updated to support wasm") + } + RuntimeType::MultiThread => { + bevy_tasks::IoTaskPool(bevy_tasks::TaskPoolBuilder::default().build()) + } }; Ok(PackfileReader(Arc::new(PackfileReaderInner { @@ -263,50 +273,55 @@ impl PackfileReaderInner { impl LoaderIO for PackfileReader { fn get_asset_metadata_with_dependencies(&mut self, request: MetadataRequest) { - let _guard = self.0.runtime.enter(); let inner = self.0.clone(); - tokio::spawn(async move { - match inner.get_asset_metadata_with_dependencies_impl(&request) { - Ok(data) => request.complete(data), - Err(err) => request.error(err), - } - }); + self.0 + .runtime + .spawn(async move { + match inner.get_asset_metadata_with_dependencies_impl(&request) { + Ok(data) => request.complete(data), + Err(err) => request.error(err), + } + }) + .detach(); } fn get_asset_candidates(&mut self, requests: Vec) { - let _guard = self.0.runtime.enter(); for request in requests { let inner = self.0.clone(); - tokio::spawn(async move { - match inner.get_asset_candidates_impl(&request) { - Ok(data) => request.complete(data), - Err(err) => request.error(err), - } - }); + self.0 + .runtime + .spawn(async move { + match inner.get_asset_candidates_impl(&request) { + Ok(data) => request.complete(data), + Err(err) => request.error(err), + } + }) + .detach(); } } fn get_artifacts(&mut self, requests: Vec) { - let _guard = self.0.runtime.enter(); for request in requests { let inner = self.0.clone(); - tokio::spawn(async move { - match inner.get_artifact_impl(&request) { - Ok(data) => request.complete(data), - Err(err) => request.error(err), - } - }); + self.0 + .runtime + .spawn(async move { + match inner.get_artifact_impl(&request) { + Ok(data) => request.complete(data), + Err(err) => request.error(err), + } + }) + .detach(); } } fn tick(&mut self, _loader: &mut LoaderState) { + //TODO: Handle the CurrentThread case.. before we switched from tokio to bevy_tasks, this + // would deadlock as this task never yielded and there was no other threads trying to make + // progress on other tasks. // We require this yield if the runtime is a CurrentThread runtime - if self.0.runtime_type == RuntimeType::CurrentThread { - self.0.runtime.block_on(tokio::task::yield_now()); - } - } - - fn with_runtime(&self, f: &mut dyn FnMut(&tokio::runtime::Runtime)) { - f(&self.0.runtime); + //if self.0.runtime_type == RuntimeType::CurrentThread { + // self.0.runtime.block_on(tokio::task::yield_now()); + //} } } diff --git a/loader/src/rpc_io.rs b/loader/src/rpc_io.rs index 8e3d0257..7d17b4d0 100644 --- a/loader/src/rpc_io.rs +++ b/loader/src/rpc_io.rs @@ -3,19 +3,15 @@ use std::{error::Error, path::PathBuf, sync::Mutex}; use capnp::message::ReaderOptions; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use crossbeam_channel::{unbounded, Receiver, Sender}; -use distill_core::{utils, AssetMetadata, AssetUuid}; +use distill_core::{distill_signal, utils, AssetMetadata, AssetUuid}; use distill_schema::{data::asset_change_event, parse_db_metadata, service::asset_hub}; use futures_util::AsyncReadExt; -use tokio::{ - net::TcpStream, - runtime::{Builder, Runtime}, - sync::oneshot, -}; use crate::{ io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest}, loader::LoaderState, }; +use std::rc::Rc; type Promise = capnp::capability::Promise; @@ -36,15 +32,14 @@ struct SnapshotChange { enum InternalConnectionState { None, - Connecting(oneshot::Receiver>>), + Connecting(distill_signal::Receiver>>), Connected(RpcConnection), Error(Box), } -/// the tokio::Runtime and tasks, as well as the connection state +/// the executor and tasks, as well as the connection state struct RpcRuntime { - runtime: Runtime, - local: tokio::task::LocalSet, + local: Rc>, connection: InternalConnectionState, } @@ -101,59 +96,62 @@ impl RpcRuntime { _ => {} }; - let (conn_tx, conn_rx) = oneshot::channel(); - - self.local.spawn_local(async move { - let result = async move { - log::trace!("Tcp connect to {:?}", connect_string); - let stream = TcpStream::connect(connect_string).await?; - stream.set_nodelay(true)?; - - use tokio_util::compat::*; - let (reader, writer) = stream.compat().split(); - - log::trace!("Creating capnp VatNetwork"); - let rpc_network = Box::new(twoparty::VatNetwork::new( - reader, - writer, - rpc_twoparty_capnp::Side::Client, - *ReaderOptions::new() - .nesting_limit(64) - .traversal_limit_in_words(Some(256 * 1024 * 1024)), - )); - - let mut rpc_system = RpcSystem::new(rpc_network, None); - - let hub: asset_hub::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - - let _disconnector = rpc_system.get_disconnector(); - - tokio::task::spawn_local(rpc_system); - - log::trace!("Requesting RPC snapshot.."); - let response = hub.get_snapshot_request().send().promise.await?; - - let snapshot = response.get()?.get_snapshot()?; - log::trace!("Received snapshot, registering listener.."); - let (snapshot_tx, snapshot_rx) = unbounded(); - let listener: asset_hub::listener::Client = capnp_rpc::new_client(ListenerImpl { - snapshot_channel: snapshot_tx, - snapshot_change: None, - }); - - let mut request = hub.register_listener_request(); - request.get().set_listener(listener); - let rpc_conn = request.send().promise.await.map(|_| RpcConnection { - snapshot, - snapshot_rx, - })?; - log::trace!("Registered listener, done connecting RPC loader."); + let (conn_tx, conn_rx) = distill_signal::oneshot(); + + let local = self.local.clone(); + self.local + .spawn(async move { + let result = async move { + log::trace!("Tcp connect to {:?}", connect_string); + let stream = async_net::TcpStream::connect(connect_string).await?; + stream.set_nodelay(true)?; + + let (reader, writer) = stream.split(); + + log::trace!("Creating capnp VatNetwork"); + let rpc_network = Box::new(twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Client, + *ReaderOptions::new() + .nesting_limit(64) + .traversal_limit_in_words(Some(256 * 1024 * 1024)), + )); + + let mut rpc_system = RpcSystem::new(rpc_network, None); + + let hub: asset_hub::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + let _disconnector = rpc_system.get_disconnector(); + local.spawn(rpc_system).detach(); + + log::trace!("Requesting RPC snapshot.."); + let response = hub.get_snapshot_request().send().promise.await?; + + let snapshot = response.get()?.get_snapshot()?; + log::trace!("Received snapshot, registering listener.."); + let (snapshot_tx, snapshot_rx) = unbounded(); + let listener: asset_hub::listener::Client = + capnp_rpc::new_client(ListenerImpl { + snapshot_channel: snapshot_tx, + snapshot_change: None, + }); + + let mut request = hub.register_listener_request(); + request.get().set_listener(listener); + let rpc_conn = request.send().promise.await.map(|_| RpcConnection { + snapshot, + snapshot_rx, + })?; + log::trace!("Registered listener, done connecting RPC loader."); - Ok(rpc_conn) - } - .await; - let _ = conn_tx.send(result); - }); + Ok(rpc_conn) + } + .await; + let _ = conn_tx.send(result); + }) + .detach(); self.connection = InternalConnectionState::Connecting(conn_rx) } @@ -183,8 +181,7 @@ impl RpcIO { Ok(RpcIO { connect_string, runtime: Mutex::new(RpcRuntime { - runtime: Builder::new_current_thread().enable_all().build()?, - local: tokio::task::LocalSet::new(), + local: Rc::new(async_executor::LocalExecutor::new()), connection: InternalConnectionState::None, }), requests: Default::default(), @@ -236,12 +233,12 @@ impl LoaderIO for RpcIO { Ok(conn) => InternalConnectionState::Connected(conn), Err(err) => InternalConnectionState::Error(err), }, - Err(oneshot::error::TryRecvError::Closed) => { + Err(distill_signal::error::TryRecvError::Closed) => { InternalConnectionState::Error(Box::new( - oneshot::error::TryRecvError::Closed, + distill_signal::error::TryRecvError::Closed, )) } - Err(oneshot::error::TryRecvError::Empty) => { + Err(distill_signal::error::TryRecvError::Empty) => { InternalConnectionState::Connecting(pending_connection) } } @@ -249,17 +246,10 @@ impl LoaderIO for RpcIO { c => c, }; - runtime - .local - .block_on(&runtime.runtime, tokio::task::yield_now()); + runtime.local.try_tick(); runtime.check_asset_changes(loader); } - - fn with_runtime(&self, f: &mut dyn FnMut(&tokio::runtime::Runtime)) { - let runtime = self.runtime.lock().unwrap(); - f(&runtime.runtime) - } } async fn do_metadata_request( @@ -342,46 +332,55 @@ fn process_requests(runtime: &mut RpcRuntime, requests: &mut QueuedRequests) { let len = requests.data_requests.len(); for asset in requests.data_requests.drain(0..len) { let snapshot = connection.snapshot.clone(); - runtime.local.spawn_local(async move { - match do_import_artifact_request(&asset, &snapshot).await { - Ok(data) => { - asset.complete(data); - } - Err(e) => { - asset.error(e); + runtime + .local + .spawn(async move { + match do_import_artifact_request(&asset, &snapshot).await { + Ok(data) => { + asset.complete(data); + } + Err(e) => { + asset.error(e); + } } - } - }); + }) + .detach(); } let len = requests.metadata_requests.len(); for m in requests.metadata_requests.drain(0..len) { let snapshot = connection.snapshot.clone(); - runtime.local.spawn_local(async move { - match do_metadata_request(&m, &snapshot).await { - Ok(data) => { - m.complete(data); - } - Err(e) => { - m.error(e); + runtime + .local + .spawn(async move { + match do_metadata_request(&m, &snapshot).await { + Ok(data) => { + m.complete(data); + } + Err(e) => { + m.error(e); + } } - } - }); + }) + .detach(); } let len = requests.resolve_requests.len(); for m in requests.resolve_requests.drain(0..len) { let snapshot = connection.snapshot.clone(); - runtime.local.spawn_local(async move { - match do_resolve_request(&m, &snapshot).await { - Ok(data) => { - m.complete(data); - } - Err(e) => { - m.error(e); + runtime + .local + .spawn(async move { + match do_resolve_request(&m, &snapshot).await { + Ok(data) => { + m.complete(data); + } + Err(e) => { + m.error(e); + } } - } - }); + }) + .detach(); } } } diff --git a/loader/src/task_local.rs b/loader/src/task_local.rs new file mode 100644 index 00000000..78aa5b51 --- /dev/null +++ b/loader/src/task_local.rs @@ -0,0 +1,186 @@ +use pin_project_lite::pin_project; +use std::cell::RefCell; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, thread}; + +// Reproduced from `tokio` (under MIT license) at https://github.com/tokio-rs/tokio/blob/9a3603fa75ff854e007d372061edf47cf8d02690/tokio/src/task/task_local.rs. + +/// Declares a new task-local key. +/// +/// # Syntax +/// +/// The macro wraps any number of static declarations and makes them local to the current task. +/// Publicity and attributes for each static is preserved. For example: +/// +#[macro_export] +macro_rules! task_local { + // empty (base case for the recursion) + () => {}; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + $crate::task_local!($($rest)*); + }; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { + $crate::__task_local_inner!($(#[$attr])* $vis $name, $t); + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __task_local_inner { + ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => { + $vis static $name: $crate::task_local::LocalKey<$t> = { + std::thread_local! { + static __KEY: std::cell::RefCell> = std::cell::RefCell::new(None); + } + + $crate::task_local::LocalKey { inner: __KEY } + }; + }; +} + +/// A key for task-local data. +/// +/// This type is generated by the `task_local!` macro. +/// +/// Unlike [`std::thread::LocalKey`], `LocalKey` will +/// _not_ lazily initialize the value on first access. Instead, the +/// value is first initialized when the future containing +/// the task-local is first polled by a futures executor. +pub struct LocalKey { + #[doc(hidden)] + pub inner: thread::LocalKey>>, +} + +impl LocalKey { + /// Sets a value `T` as the task-local value for the future `F`. + /// + /// On completion of `scope`, the task-local will be dropped. + pub async fn scope(&'static self, value: T, f: F) -> F::Output + where + F: Future, + { + TaskLocalFuture { + local: &self, + slot: Some(value), + future: f, + } + .await + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// # Panics + /// + /// This function will panic if not called within the context + /// of a future containing a task-local with the corresponding key. + pub fn with(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + self.try_with(f).expect( + "cannot access a Task Local Storage value \ + without setting it via `LocalKey::set`", + ) + } + + /// Accesses the current task-local and runs the provided closure. + /// + /// If the task-local with the associated key is not present, this + /// method will return an `AccessError`. For a panicking variant, + /// see `with`. + pub fn try_with(&'static self, f: F) -> Result + where + F: FnOnce(&T) -> R, + { + self.inner.with(|v| { + if let Some(val) = v.borrow().as_ref() { + Ok(f(val)) + } else { + Err(AccessError { _private: () }) + } + }) + } +} + +impl fmt::Debug for LocalKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("LocalKey { .. }") + } +} + +pin_project! { + struct TaskLocalFuture { + local: &'static LocalKey, + slot: Option, + #[pin] + future: F, + } +} + +impl TaskLocalFuture { + fn with_task) -> R, R>(self: Pin<&mut Self>, f: F2) -> R { + struct Guard<'a, T: 'static> { + local: &'static LocalKey, + slot: &'a mut Option, + prev: Option, + } + + impl Drop for Guard<'_, T> { + fn drop(&mut self) { + let value = self.local.inner.with(|c| c.replace(self.prev.take())); + *self.slot = value; + } + } + + let mut project = self.project(); + let val = project.slot.take(); + + let prev = project.local.inner.with(|c| c.replace(val)); + + let _guard = Guard { + prev, + slot: &mut project.slot, + local: *project.local, + }; + + f(project.future) + } +} + +impl Future for TaskLocalFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.with_task(|f| f.poll(cx)) + } +} + +// Required to make `pin_project` happy. +trait StaticLifetime: 'static {} +impl StaticLifetime for T {} + +/// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with). +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct AccessError { + _private: (), +} + +impl fmt::Debug for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AccessError").finish() + } +} + +impl fmt::Display for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("task-local value not set", f) + } +} + +impl Error for AccessError {} diff --git a/src/lib.rs b/src/lib.rs index f34d8442..5bae48a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,9 @@ mod tests { sync::{Once, RwLock}, }; - use distill_core::{type_uuid, type_uuid::TypeUuid, AssetRef, AssetTypeId, AssetUuid}; + use distill_core::{ + distill_signal, type_uuid, type_uuid::TypeUuid, AssetRef, AssetTypeId, AssetUuid, + }; use distill_daemon::{init_logging, AssetDaemon}; use distill_importer::{ AsyncImporter, ImportOp, ImportedAsset, ImporterValue, Result as ImportResult, @@ -54,7 +56,9 @@ mod tests { }, LoadHandle, Loader, }; - use futures::{future::BoxFuture, io::AsyncReadExt, AsyncRead}; + use futures_core::future::BoxFuture; + use futures_io::AsyncRead; + use futures_util::io::AsyncReadExt; use serde::{Deserialize, Serialize}; use serial_test::serial; use uuid::Uuid; @@ -130,7 +134,7 @@ mod tests { }) .filter(|line| !line.is_empty()) .flat_map(|line| line.chars().chain(std::iter::once('\n'))); - String::from_iter(processed) + processed.collect() }) } } @@ -346,10 +350,7 @@ mod tests { fn spawn_daemon( daemon_address: &str, - ) -> ( - std::thread::JoinHandle<()>, - tokio::sync::oneshot::Sender, - ) { + ) -> (std::thread::JoinHandle<()>, distill_signal::Sender) { let daemon_address = daemon_address .parse() .expect("Failed to parse string as `SocketAddr`.");