Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for cancellable hotplug #110

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ slab = "0.4.9"
[dev-dependencies]
env_logger = "0.10.0"
futures-lite = "1.13.0"
ctrlc = "3.4.5"

[target.'cfg(any(target_os="linux", target_os="android"))'.dependencies]
rustix = { version = "0.38.17", features = ["fs", "event", "net"] }
Expand Down
25 changes: 25 additions & 0 deletions examples/hotplug_cancellable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use futures_lite::stream;
use std::thread;

fn main() {
env_logger::init();

let (stream_it, cancellation) = nusb::watch_devices_cancellable().unwrap();

ctrlc::set_handler(move || {
println!("ctrl-c, cancel hotplug");
cancellation.cancel();
})
.expect("Error setting Ctrl-C handler");

let join_handle = thread::spawn(move || {
for event in stream::block_on(stream_it) {
println!("{:#?}", event);
}
println!("exit thread");
});

join_handle.join().unwrap();

println!("main exit");
}
84 changes: 84 additions & 0 deletions src/cancellable_hotplug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//! Types for receiving notifications when USB devices are connected or
//! disconnected from the system.
//!
//! See [`super::watch_devices_cancellable`] for a usage example.

use atomic_waker::AtomicWaker;
use futures_core::Stream;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::task::Poll;

use crate::hotplug::HotplugEvent;

struct HotplugCancellationTokenInner {
waker: AtomicWaker,
cancelled: AtomicBool,
}

/// Cancellation token
///
/// Call cancel() once hotplug events needs to be stopped
#[derive(Clone)]
pub struct HotplugCancellationToken(Arc<HotplugCancellationTokenInner>);

/// Stream of device connection / disconnection events.
///
/// Call [`super::watch_devices`] to begin watching device
/// events and create a `HotplugWatch`.
pub struct CancellableHotplugWatch {
platform: crate::platform::HotplugWatch,
cancellation: HotplugCancellationToken,
}

impl Stream for CancellableHotplugWatch {
type Item = HotplugEvent;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.cancellation.0.waker.register(cx.waker());
if self
.cancellation
.0
.cancelled
.load(std::sync::atomic::Ordering::Relaxed)
{
return Poll::Ready(None);
}

self.platform.poll_next(cx).map(Some)
}
}

impl HotplugCancellationToken {
fn new() -> Self {
Self(Arc::new(HotplugCancellationTokenInner {
waker: AtomicWaker::new(),
cancelled: AtomicBool::new(false),
}))
}

/// Cancel lazily hotplug events
pub fn cancel(&self) {
self.0
.cancelled
.store(true, std::sync::atomic::Ordering::Relaxed);
self.0.waker.wake();
}
}

impl CancellableHotplugWatch {
/// Create new CancellableHotplugWatch with HotplugCancellationToken
pub fn new() -> Result<(Self, HotplugCancellationToken), crate::Error> {
let token = HotplugCancellationToken::new();
Ok((
Self {
platform: crate::platform::HotplugWatch::new()?,
cancellation: token.clone(),
},
token,
))
}
}
13 changes: 13 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub use device::{Device, Interface};

pub mod transfer;

pub mod cancellable_hotplug;
pub mod hotplug;

/// OS error returned from operations other than transfers.
Expand Down Expand Up @@ -209,3 +210,15 @@ pub fn list_buses() -> Result<impl Iterator<Item = BusInfo>, Error> {
pub fn watch_devices() -> Result<hotplug::HotplugWatch, Error> {
Ok(hotplug::HotplugWatch(platform::HotplugWatch::new()?))
}

/// Returns CancellableHotplugWatch and HotplugCancellationToken.
///
pub fn watch_devices_cancellable() -> Result<
(
cancellable_hotplug::CancellableHotplugWatch,
cancellable_hotplug::HotplugCancellationToken,
),
Error,
> {
cancellable_hotplug::CancellableHotplugWatch::new()
}
Loading