From b96a61ea0b0443af00567c1df6540f960d8db3bd Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 7 Jan 2025 13:45:24 -0800 Subject: [PATCH] plugin: track resources used per-job Problem: The priority plugin does not keep track of the resources allocated per-job when an association submits a job, which will be needed when the plugin begins to enforce limits on how many resources an association can use across all of their running jobs. Add an increment/decrement of node+core counts in job.state.run and job.state.inactive by unpacking the jobspec of the job and calculating the number of nodes and cores assigned with the job. Increment and decrement these values in the "cur_nodes" and "cur_cores" attributes of the Association object. --- src/plugins/Makefile.am | 2 +- src/plugins/mf_priority.cpp | 99 +++++++++++++++++++++++++++++++++++-- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 35576e034..4ea616e9c 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,6 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp +mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 36b5853cf..3500ec8f6 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -32,6 +32,8 @@ extern "C" { // custom bank_info class file #include "accounting.hpp" +// custom job resource counting file +#include "jj.hpp" // the plugin does not know about the association who submitted a job and will // assign default values to the association until it receives information from @@ -198,6 +200,42 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid) } +/* + * Using the jobspec from a job, increment the cur_nodes and cur_cores counts + * for an association. + */ +static int increment_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes + counts.nnodes; + b->cur_cores = b->cur_cores + (counts.nslots * counts.slot_size); + + return 0; +} + + +/* + * Using the jobspec from a job, decrement the cur_nodes and cur_cores counts + * for an association. + */ +static int decrement_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes - counts.nnodes; + b->cur_cores = b->cur_cores - (counts.nslots * counts.slot_size); + + return 0; +} + + /****************************************************************************** * * * Callbacks * @@ -883,6 +921,19 @@ static int run_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:o}", + "jobspec", &jobspec) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } b = static_cast (flux_jobtap_job_aux_get (p, @@ -897,8 +948,27 @@ static int run_cb (flux_plugin_t *p, return -1; } - // increment the user's current running jobs count + // increment the user's current running jobs and resources counts b->cur_run_jobs++; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to unpack " \ + "jobspec"); + return -1; + } else { + if (increment_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to increment " \ + "resource count"); + return -1; + } + } return 0; } @@ -1099,12 +1169,14 @@ static int inactive_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", - "userid", &userid) < 0) { + "{s:i, s:o}", + "userid", &userid, + "jobspec", &jobspec) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -1131,8 +1203,27 @@ static int inactive_cb (flux_plugin_t *p, return 0; // this job was running, so decrement the current running jobs count - // and look to see if any held jobs can be released + // and the resources count and look to see if any held jobs can be released b->cur_run_jobs--; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "unpack jobspec"); + return -1; + } else { + if (decrement_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "decrement resource count"); + return -1; + } + } // if the user/bank combo has any currently held jobs and the user is now // under their max jobs limit, remove the dependency from first held job