Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update multi-user signaling to track flux-security 0.13.0 IMP changes #6408

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ AS_IF([test x$enable_code_coverage = xyes], [
AC_ARG_WITH([flux-security], AS_HELP_STRING([--with-flux-security],
[Build with flux-security]))
AS_IF([test "x$with_flux_security" = "xyes"], [
PKG_CHECK_MODULES([FLUX_SECURITY], [flux-security >= 0.9.0],
PKG_CHECK_MODULES([FLUX_SECURITY], [flux-security >= 0.13.0],
[flux_sec_incdir=`$PKG_CONFIG --variable=includedir flux-security`],
[flux_sec_incdir=;])
AS_IF([test "x$flux_sec_incdir" = x],
Expand Down
6 changes: 1 addition & 5 deletions doc/man1/flux-exec.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ OPTIONS

Prepend the full path to :program:`flux-imp run` to *COMMAND*. This option
is mostly meant for testing or as a convenience to execute a configured
``prolog`` or ``epilog`` command under the IMP. Note: When this option is
used, or if :program:`flux-imp` is detected as the first argument of
*COMMAND*, :program:`flux exec` will use :program:`flux-imp kill` to
signal remote commands instead of the normal builtin subprocess signaling
mechanism.
``prolog`` or ``epilog`` command under the IMP.

CAVEATS
=======
Expand Down
67 changes: 13 additions & 54 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,68 +335,27 @@ static void stdin_cb (flux_reactor_t *r,
}
}

static void kill_completion_cb (flux_subprocess_t *p)
{
flux_subprocess_destroy (p);
}

static flux_subprocess_t *imp_kill (flux_subprocess_t *p, int signum)
{
flux_cmd_t *cmd;
flux_subprocess_ops_t ops = {
.on_completion = kill_completion_cb,
.on_state_change = NULL,
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
};

pid_t pid = flux_subprocess_pid (p);
int rank = flux_subprocess_rank (p);

if (!(cmd = flux_cmd_create (0, NULL, environ))
|| flux_cmd_argv_append (cmd, imp_path) < 0
|| flux_cmd_argv_append (cmd, "kill") < 0
|| flux_cmd_argv_appendf (cmd, "%d", signum) < 0
|| flux_cmd_argv_appendf (cmd, "-%ld", (long) pid) < 0) {
fprintf (stderr,
"Failed to create flux-imp kill command for rank %d pid %d\n",
rank, pid);
return NULL;
}
/* Note: subprocess object destroyed in completion callback
*/
return flux_rexec (flux_handle,
rank,
FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF,
cmd,
&ops);
}

static void killall (zlistx_t *l, int signum)
{
flux_subprocess_t *p = zlistx_first (l);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (use_imp) {
if (!imp_kill (p, signum))
/* RFC 15 states that the IMP will treat SIGUSR1 as a surrogate
* for SIGKILL.
*/
if (use_imp && signum == SIGKILL)
signum = SIGUSR1;

flux_future_t *f = flux_subprocess_kill (p, signum);
if (!f) {
if (optparse_getopt (opts, "verbose", NULL) > 0)
fprintf (stderr,
"failed to signal rank %d: %s\n",
flux_subprocess_rank (p),
strerror (errno));
}
else {
flux_future_t *f = flux_subprocess_kill (p, signum);
if (!f) {
if (optparse_getopt (opts, "verbose", NULL) > 0)
fprintf (stderr,
"failed to signal rank %d: %s\n",
flux_subprocess_rank (p),
strerror (errno));
}
/* don't care about response */
flux_future_destroy (f);
flux_subprocess_rank (p),
strerror (errno));
}
/* don't care about response */
flux_future_destroy (f);
}
p = zlistx_next (l);
}
Expand Down
182 changes: 4 additions & 178 deletions src/common/libsubprocess/bulk-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ struct bulk_exec {
char *service;
flux_jobid_t id;
char *name;
char *imp_path; /* Path to IMP. If set, use IMP for kill */

struct aux_item *aux;

Expand Down Expand Up @@ -504,7 +503,6 @@ void bulk_exec_destroy (struct bulk_exec *exec)
aux_destroy (&exec->aux);
free (exec->name);
free (exec->service);
free (exec->imp_path);
free (exec);
errno = saved_errno;
}
Expand Down Expand Up @@ -553,19 +551,6 @@ int bulk_exec_set_max_per_loop (struct bulk_exec *exec, int max)
return 0;
}

int bulk_exec_set_imp_path (struct bulk_exec *exec,
const char *imp_path)
{
if (!exec || !imp_path) {
errno = EINVAL;
return -1;
}
free (exec->imp_path);
if (!(exec->imp_path = strdup (imp_path)))
return -1;
return 0;
}

int bulk_exec_push_cmd (struct bulk_exec *exec,
const struct idset *ranks,
flux_cmd_t *cmd,
Expand Down Expand Up @@ -683,14 +668,14 @@ void bulk_exec_kill_log_error (flux_future_t *f, flux_jobid_t id)
}
}

static flux_future_t *bulk_exec_simple_kill (struct bulk_exec *exec,
const struct idset *ranks,
int signum)
flux_future_t *bulk_exec_kill (struct bulk_exec *exec,
const struct idset *ranks,
int signum)
{
flux_subprocess_t *p;
flux_future_t *cf;

if (!exec) {
if (!exec || signum < 0) {
errno = EINVAL;
return NULL;
}
Expand Down Expand Up @@ -738,165 +723,6 @@ static flux_future_t *bulk_exec_simple_kill (struct bulk_exec *exec,
return cf;
}

static void imp_kill_output (struct bulk_exec *kill,
flux_subprocess_t *p,
const char *stream,
const char *data,
int len,
void *arg)
{
int rank = flux_subprocess_rank (p);
flux_log (kill->h,
LOG_INFO,
"%s (rank %d): imp kill: %.*s",
flux_get_hostbyrank (kill->h, rank),
rank,
len,
data);
}

static void imp_kill_complete (struct bulk_exec *kill, void *arg)
{
flux_future_t *f = arg;
if (bulk_exec_rc (kill) < 0)
flux_future_fulfill_error (f, 0, NULL);
else
flux_future_fulfill (f, NULL, NULL);
}

static void imp_kill_error (struct bulk_exec *kill,
flux_subprocess_t *p,
void *arg)
{
int rank = flux_subprocess_rank (p);
flux_log_error (kill->h,
"imp kill on %s (rank %d) failed",
flux_get_hostbyrank (kill->h, rank),
rank);
}


struct bulk_exec_ops imp_kill_ops = {
.on_output = imp_kill_output,
.on_error = imp_kill_error,
.on_complete = imp_kill_complete,
};

static int bulk_exec_push_one (struct bulk_exec *exec,
int rank,
flux_cmd_t *cmd,
int flags)
{
int rc = -1;
struct idset *ids = idset_create (0, IDSET_FLAG_AUTOGROW);
if (!ids || idset_set (ids, rank) < 0)
return -1;
rc = bulk_exec_push_cmd (exec, ids, cmd, flags);
idset_destroy (ids);
return rc;
}

/* Kill all currently executing processes in bulk-exec object `exec`
* using "flux-imp kill" helper for processes potentially running
* under a different userid.
*
* Spawns "flux-imp kill <signal> <pid>" on each rank.
*/
flux_future_t *bulk_exec_imp_kill (struct bulk_exec *exec,
const char *imp_path,
const struct idset *ranks,
int signum)
{
struct bulk_exec *killcmd = NULL;
flux_subprocess_t *p = NULL;
flux_future_t *f = NULL;
int count = 0;

if (!exec || !imp_path) {
errno = EINVAL;
return NULL;
}

/* Empty future for return value
*/
if (!(f = flux_future_create (NULL, NULL))) {
flux_log_error (exec->h, "bulk_exec_imp_kill: future_create");
goto err;
}
flux_future_set_flux (f, exec->h);

if (!(killcmd = bulk_exec_create (&imp_kill_ops,
exec->service,
exec->id,
"imp-kill",
f)))
return NULL;

/* Tie bulk exec object destruction to future */
flux_future_aux_set (f, NULL, killcmd, (flux_free_f) bulk_exec_destroy);

p = zlist_first (exec->processes);
while (p) {
if ((!ranks || idset_test (ranks, flux_subprocess_rank (p)))
&& (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT)) {

pid_t pid = flux_subprocess_pid (p);
int rank = flux_subprocess_rank (p);
flux_cmd_t *cmd = flux_cmd_create (0, NULL, environ);

if (!cmd
|| flux_cmd_argv_append (cmd, imp_path) < 0
|| flux_cmd_argv_append (cmd, "kill") < 0
|| flux_cmd_argv_appendf (cmd, "%d", signum) < 0
|| flux_cmd_argv_appendf (cmd, "%ld", (long) pid)) {
flux_log_error (exec->h,
"bulk_exec_imp_kill: flux_cmd_argv_append");
goto err;
}

if (bulk_exec_push_one (killcmd, rank, cmd, 0) < 0) {
flux_log_error (exec->h, "bulk_exec_imp_kill: push_cmd");
goto err;
}

count++;
flux_cmd_destroy (cmd);
}
p = zlist_next (exec->processes);
}

if (count == 0) {
errno = ENOENT;
goto err;
}

bulk_exec_aux_set (killcmd, "future", f, NULL);

if (bulk_exec_start (exec->h, killcmd) < 0) {
flux_log_error (exec->h, "bulk_exec_start");
goto err;
}

return f;
err:
flux_future_destroy (f);
return NULL;
}

flux_future_t *bulk_exec_kill (struct bulk_exec *exec,
const struct idset *ranks,
int signum)
{
if (!exec || signum < 0) {
errno = EINVAL;
return NULL;
}
if (exec->imp_path)
return bulk_exec_imp_kill (exec, exec->imp_path, ranks, signum);
return bulk_exec_simple_kill (exec, ranks, signum);
}

int bulk_exec_aux_set (struct bulk_exec *exec,
const char *key,
void *val,
Expand Down
5 changes: 0 additions & 5 deletions src/common/libsubprocess/bulk-exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ int bulk_exec_aux_set (struct bulk_exec *exec,
*/
int bulk_exec_set_max_per_loop (struct bulk_exec *exec, int max);

/* Set path to an IMP to use for bulk_exec_kill().
*/
int bulk_exec_set_imp_path (struct bulk_exec *exec,
const char *imp_path);

void bulk_exec_destroy (struct bulk_exec *exec);

int bulk_exec_push_cmd (struct bulk_exec *exec,
Expand Down
4 changes: 0 additions & 4 deletions src/common/libsubprocess/test/bulk-exec-einval.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@ int main (int argc, char *argv[])
"bulk_exec_aux_set (NULL, ..) returns EINVAL");
ok (bulk_exec_set_max_per_loop (NULL, 1) < 0 && errno == EINVAL,
"bulk_exec_set_max_per_loop (NULL, 1) returns EINVAL");
ok (bulk_exec_set_imp_path (NULL, NULL) < 0 && errno == EINVAL,
"bulk_exec_set_imp_path (NULL, NULL) returns EINVAL");
ok (bulk_exec_push_cmd (NULL, NULL, NULL, 0) < 0 && errno == EINVAL,
"bulk_exec_push_cmd (NULL, ...) returns EINVAL");
ok (bulk_exec_start (NULL, NULL) < 0 && errno == EINVAL,
"bulk_exec_start (NULL, NULL) returns EINVAL");
ok (bulk_exec_kill (NULL, NULL, 0) == NULL && errno == EINVAL,
"bulk_exec_kill (NULL, NULL, 0) returns EINVAL");
ok (bulk_exec_imp_kill (NULL, NULL, NULL, 0) == NULL && errno == EINVAL,
"bulk_exec_imp_kill (NULL, NULL, NULL, 0) returns EINVAL");
ok (bulk_exec_cancel (NULL) < 0 && errno == EINVAL,
"bulk_exec_cancel (NULL) returns EINVAL");
ok (bulk_exec_rc (NULL) < 0 && errno == EINVAL,
Expand Down
4 changes: 0 additions & 4 deletions src/modules/job-exec/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,6 @@ static int exec_init (struct jobinfo *job)
flux_log_error (job->h, "exec_init: bulk_exec_create");
goto err;
}
if (job->multiuser && bulk_exec_set_imp_path (exec, imp_path)) {
flux_log_error (job->h, "exec_ctx_create: bulk_exec_set_imp_path");
goto err;
}
if (!(ctx = exec_ctx_create (job, ranks, &error))) {
flux_log (job->h, LOG_ERR, "exec_ctx_create: %s", error.text);
goto err;
Expand Down
14 changes: 11 additions & 3 deletions src/modules/job-exec/job-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,21 @@ static void kill_shell_timer_cb (flux_reactor_t *r,
{
struct jobinfo *job = arg;
struct idset *active_ranks;
int actual_kill_signal = kill_signal;

/* RFC 15 states that the IMP handles SIGUSR1 by sending SIGKILL to
* the entire cgroup. Sending SIGKILL to the IMP is not productive.
*/
if (job->multiuser)
actual_kill_signal = SIGUSR1;

flux_log (job->h,
LOG_DEBUG,
"Sending %s to job shell for job %s",
sigutil_signame (kill_signal),
"Sending %s to %s for job %s",
sigutil_signame (actual_kill_signal),
job->multiuser ? "IMP" : "job shell",
idf58 (job->id));
(*job->impl->kill) (job, kill_signal);
(*job->impl->kill) (job, actual_kill_signal);
job->kill_shell_count++;

/* Since we've transitioned to killing the shell directly, stop the
Expand Down
2 changes: 0 additions & 2 deletions src/modules/job-manager/housekeeping.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ static struct allocation *allocation_create (struct housekeeping *hk,
id,
"housekeeping",
a))
|| (hk->imp_path
&& bulk_exec_set_imp_path (a->bulk_exec, hk->imp_path) < 0)
|| update_cmd_env (hk->cmd, id, userid) < 0
|| bulk_exec_push_cmd (a->bulk_exec, a->pending, hk->cmd, 0) < 0) {
allocation_destroy (a);
Expand Down
Loading
Loading