Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce KillSignal for sending signals to tasks #131

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@ description = "Cross platform scripting for deno task"

[features]
default = ["shell"]
shell = ["futures", "glob", "os_pipe", "path-dedot", "tokio", "tokio-util"]
shell = ["futures", "glob", "nix", "os_pipe", "path-dedot", "tokio"]
serialization = ["serde"]

[dependencies]
anyhow = "1.0.75"
futures = { version = "0.3.29", optional = true }
glob = { version = "0.3.1", optional = true }
path-dedot = { version = "3.1.1", optional = true }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "process", "rt-multi-thread", "sync", "time"], optional = true }
tokio-util = { version = "0.7.10", optional = true }
os_pipe = { version = "1.1.4", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
monch = "0.5.0"
thiserror = "1.0.58"
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "process", "rt-multi-thread", "sync", "time"], optional = true }

[target.'cfg(unix)'.dependencies]
nix = { version = "0.27.1", features = ["signal"], optional = true }

[dev-dependencies]
deno_unsync = "0.4.1"
parking_lot = "0.12.1"
pretty_assertions = "1"
serde_json = "1.0.111"
Expand Down
4 changes: 2 additions & 2 deletions src/shell/commands/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ fn execute_cat(mut context: ShellCommandContext) -> Result<ExecuteResult> {
// in memory
match File::open(context.state.cwd().join(&path)) {
Ok(mut file) => loop {
if context.state.token().is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = context.state.kill_signal().aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}

let size = file.read(&mut buf)?;
Expand Down
8 changes: 4 additions & 4 deletions src/shell/commands/cp_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ShellCommand for CpCommand {
async move {
execute_with_cancellation!(
cp_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ShellCommand for MvCommand {
async move {
execute_with_cancellation!(
mv_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand All @@ -184,10 +184,10 @@ async fn mv_command(
mut stderr: ShellPipeWriter,
) -> ExecuteResult {
match execute_mv(cwd, args).await {
Ok(()) => ExecuteResult::Continue(0, Vec::new(), Vec::new()),
Ok(()) => ExecuteResult::from_exit_code(0),
Err(err) => {
let _ = stderr.write_line(&format!("mv: {err}"));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
ExecuteResult::from_exit_code(1)
}
}
}
Expand Down
58 changes: 43 additions & 15 deletions src/shell/commands/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ impl ShellCommand for ExecutableCommand {
let display_name = self.display_name.clone();
let command_name = self.command_path.clone();
async move {
// don't spawn if already aborted
if let Some(exit_code) = context.state.kill_signal().aborted_code() {
return ExecuteResult::from_exit_code(exit_code);
}

let mut stderr = context.stderr;
let mut sub_command = tokio::process::Command::new(&command_name);
let child = sub_command
Expand All @@ -48,31 +53,54 @@ impl ShellCommand for ExecutableCommand {
"Error launching '{}': {}",
display_name, err
));
return ExecuteResult::Continue(1, Vec::new(), Vec::new());
return ExecuteResult::from_exit_code(1);
}
};

// avoid deadlock since this is holding onto the pipes
drop(sub_command);

tokio::select! {
result = child.wait() => match result {
Ok(status) => ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
),
Err(err) => {
let _ = stderr.write_line(&format!("{}", err));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
loop {
tokio::select! {
result = child.wait() => match result {
Ok(status) => return ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
),
Err(err) => {
let _ = stderr.write_line(&format!("{}", err));
return ExecuteResult::from_exit_code(1);
}
},
signal = context.state.kill_signal().wait_any() => {
if let Some(_id) = child.id() {
#[cfg(unix)]
kill(_id as i32, signal);
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved

if cfg!(not(unix)) && signal.causes_abort() {
let _ = child.start_kill();
let status = child.wait().await.ok();
return ExecuteResult::from_exit_code(
status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()),
);
}
}
}
},
_ = context.state.token().cancelled() => {
let _ = child.kill().await;
ExecuteResult::for_cancellation()
}
}
}
.boxed_local()
}
}

#[cfg(unix)]
pub fn kill(pid: i32, signal: crate::SignalKind) -> Option<()> {
use nix::sys::signal::kill as unix_kill;
use nix::sys::signal::Signal;
use nix::unistd::Pid;
let signo: i32 = signal.into();
let sig = Signal::try_from(signo).ok()?;
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
unix_kill(Pid::from_raw(pid), Some(sig)).ok()?;
Some(())
}
18 changes: 9 additions & 9 deletions src/shell/commands/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::bail;
use anyhow::Result;
use futures::future::LocalBoxFuture;

use crate::shell::CancellationToken;
use crate::shell::KillSignal;
use crate::ExecuteResult;
use crate::ShellCommand;
use crate::ShellCommandContext;
Expand Down Expand Up @@ -38,23 +38,23 @@ impl ShellCommand for HeadCommand {
fn copy_lines<F: FnMut(&mut [u8]) -> Result<usize>>(
writer: &mut ShellPipeWriter,
max_lines: u64,
cancellation_token: &CancellationToken,
kill_signal: &KillSignal,
mut read: F,
buffer_size: usize,
) -> Result<ExecuteResult> {
let mut written_lines = 0;
let mut buffer = vec![0; buffer_size];
while written_lines < max_lines {
if cancellation_token.is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = kill_signal.aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}
let read_bytes = read(&mut buffer)?;
if read_bytes == 0 {
break;
}

if cancellation_token.is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = kill_signal.aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}

let mut written_bytes: usize = 0;
Expand Down Expand Up @@ -85,7 +85,7 @@ fn execute_head(mut context: ShellCommandContext) -> Result<ExecuteResult> {
copy_lines(
&mut context.stdout,
flags.lines,
context.state.token(),
context.state.kill_signal(),
|buf| context.stdin.read(buf),
512,
)
Expand All @@ -95,7 +95,7 @@ fn execute_head(mut context: ShellCommandContext) -> Result<ExecuteResult> {
Ok(mut file) => copy_lines(
&mut context.stdout,
flags.lines,
context.state.token(),
context.state.kill_signal(),
|buf| file.read(buf).map_err(Into::into),
512,
),
Expand Down Expand Up @@ -174,7 +174,7 @@ mod test {
let result = copy_lines(
&mut writer,
2,
&CancellationToken::new(),
&KillSignal::default(),
|buffer| {
if offset >= data.len() {
return Ok(0);
Expand Down
6 changes: 3 additions & 3 deletions src/shell/commands/mkdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl ShellCommand for MkdirCommand {
async move {
execute_with_cancellation!(
mkdir_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand All @@ -38,10 +38,10 @@ async fn mkdir_command(
mut stderr: ShellPipeWriter,
) -> ExecuteResult {
match execute_mkdir(cwd, args).await {
Ok(()) => ExecuteResult::Continue(0, Vec::new(), Vec::new()),
Ok(()) => ExecuteResult::from_exit_code(0),
Err(err) => {
let _ = stderr.write_line(&format!("mkdir: {err}"));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
ExecuteResult::from_exit_code(1)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/shell/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ pub trait ShellCommand {
}

macro_rules! execute_with_cancellation {
($result_expr:expr, $token:expr) => {
($result_expr:expr, $kill_signal:expr) => {
tokio::select! {
result = $result_expr => {
result
},
_ = $token.cancelled() => {
ExecuteResult::for_cancellation()
signal = $kill_signal.wait_aborted() => {
ExecuteResult::from_exit_code(signal.aborted_code())
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/shell/commands/rm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ShellCommand for RmCommand {
async move {
execute_with_cancellation!(
rm_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down
2 changes: 1 addition & 1 deletion src/shell/commands/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ShellCommand for SleepCommand {
async move {
execute_with_cancellation!(
sleep_command(context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down
Loading