diff --git a/src/Makefile.am b/src/Makefile.am index 4863481e..7dc01d1e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -18,7 +18,8 @@ noinst_HEADERS = \ fairness/writer/data_writer_base.hpp \ fairness/writer/data_writer_db.hpp \ fairness/writer/data_writer_stdout.hpp \ - plugins/accounting.hpp + plugins/accounting.hpp \ + plugins/jj.hpp fairness_libweighted_tree_la_SOURCES = \ fairness/account/account.cpp \ diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 35576e03..4ea616e9 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,6 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp +mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 7b463a74..864a3413 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -86,7 +86,8 @@ json_t* Association::to_json () const // 'o' steals the reference for both held_job_ids and user_queues json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i, s:o," - " s:o, s:i, s:o, s:s, s:i, s:i, s:i}", + " s:o, s:i, s:o, s:s, s:i, s:i, s:i," + " s:i, s:i}", "bank_name", bank_name.c_str (), "fairshare", fairshare, "max_run_jobs", max_run_jobs, @@ -100,6 +101,8 @@ json_t* Association::to_json () const "def_project", def_project.c_str (), "max_nodes", max_nodes, "max_cores", max_cores, + "cur_nodes", cur_nodes, + "cur_cores", cur_cores, "active", active); if (!u) diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index f83884db..8e9b9c47 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -46,6 +46,8 @@ class Association { std::string def_project; // default project int max_nodes; // max num nodes across all running jobs int max_cores; // max num cores across all running jobs + int cur_nodes; // current number of used nodes + int cur_cores; // current number of used cores // methods json_t* to_json () const; // convert object to JSON string diff --git a/src/plugins/jj.cpp b/src/plugins/jj.cpp new file mode 100644 index 00000000..971b78ac --- /dev/null +++ b/src/plugins/jj.cpp @@ -0,0 +1,165 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +extern "C" { +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include "jj.hpp" + +static int jj_read_level (json_t *o, int level, struct jj_counts *jj); + +static int jj_read_vertex (json_t *o, int level, struct jj_counts *jj) +{ + int count; + const char *type = NULL; + json_t *with = NULL; + json_error_t error; + int exclusive = 0; + + if (json_unpack_ex (o, &error, 0, "{ s:s s:i s?b s?o }", + "type", &type, + "count", &count, + "exclusive", &exclusive, + "with", &with) < 0) { + snprintf (jj->error, sizeof (jj->error) - 1, + "level %d: %s", level, error.text); + errno = EINVAL; + return -1; + } + if (count <= 0) { + sprintf (jj->error, "Invalid count %d for type '%s'", + count, type); + errno = EINVAL; + return -1; + } + if (streq (type, "node")) { + jj->nnodes = count; + if (exclusive) + jj->exclusive = true; + } + else if (streq (type, "slot")) + jj->nslots = count; + else if (streq (type, "core")) + jj->slot_size = count; + else if (streq (type, "gpu")) + jj->slot_gpus = count; + else { + sprintf (jj->error, "Unsupported resource type '%s'", type); + errno = EINVAL; + return -1; + } + if (with) + return jj_read_level (with, level+1, jj); + return 0; + +} + +static int jj_read_level (json_t *o, int level, struct jj_counts *jj) +{ + int i; + json_t *v = NULL; + + if (!json_is_array (o)) { + snprintf (jj->error, sizeof (jj->error) - 1, + "level %d: must be an array", level); + errno = EINVAL; + return -1; + } + json_array_foreach (o, i, v) { + if (jj_read_vertex (v, level, jj) < 0) + return -1; + } + return 0; +} + +int jj_get_counts (const char *spec, struct jj_counts *jj) +{ + json_t *o = NULL; + json_error_t error; + int rc = -1; + + if ((o = json_loads (spec, 0, &error)) == NULL) { + snprintf (jj->error, sizeof (jj->error) - 1, + "JSON load: %s", error.text); + errno = EINVAL; + return -1; + } + + rc = jj_get_counts_json (o, jj); + json_decref (o); + return rc; +} + +int jj_get_counts_json (json_t *jobspec, struct jj_counts *jj) +{ + int version; + json_t *resources = NULL; + json_error_t error; + + if (!jj) { + errno = EINVAL; + return -1; + } + memset (jj, 0, sizeof (*jj)); + + if (json_unpack_ex (jobspec, &error, 0, "{s:i s:o}", + "version", &version, + "resources", &resources) < 0) { + snprintf (jj->error, sizeof (jj->error) - 1, + "at top level: %s", error.text); + errno = EINVAL; + return -1; + } + if (version != 1) { + snprintf (jj->error, sizeof (jj->error) - 1, + "Invalid version: expected 1, got %d", version); + errno = EINVAL; + return -1; + } + /* N.B. attributes.system is generally optional, but + * attributes.system.duration is required in jobspec version 1 */ + if (json_unpack_ex (jobspec, &error, 0, "{s:{s:{s:F}}}", + "attributes", + "system", + "duration", &jj->duration) < 0) { + snprintf (jj->error, sizeof (jj->error) - 1, + "at top level: getting duration: %s", error.text); + errno = EINVAL; + return -1; + } + if (jj_read_level (resources, 0, jj) < 0) + return -1; + + if (jj->nslots <= 0) { + snprintf (jj->error, sizeof (jj->error) - 1, + "Unable to determine slot count"); + errno = EINVAL; + return -1; + } + if (jj->slot_size <= 0) { + snprintf (jj->error, sizeof (jj->error) - 1, + "Unable to determine slot size"); + errno = EINVAL; + return -1; + } + if (jj->nnodes) + jj->nslots *= jj->nnodes; + return 0; +} + +} +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/plugins/jj.hpp b/src/plugins/jj.hpp new file mode 100644 index 00000000..b4949762 --- /dev/null +++ b/src/plugins/jj.hpp @@ -0,0 +1,62 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +extern "C" { +#ifndef HAVE_JJ_H +#define HAVE_JJ_H 1 + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#define JJ_ERROR_TEXT_LENGTH 256 + +/** + * streq - Are two strings equal? + * @a: first string + * @b: first string + * + * This macro is arguably more readable than "!strcmp(a, b)". + * + * Example: + * if (streq(somestring, "")) + * printf("String is empty!\n"); + */ +#define streq(a,b) (strcmp((a),(b)) == 0) + +struct jj_counts { + int nnodes; /* total number of nodes requested */ + int nslots; /* total number of slots requested */ + int slot_size; /* number of cores per slot */ + int slot_gpus; /* number of gpus per slot */ + + bool exclusive; /* enable node exclusive allocation if available */ + + double duration; /* attributes.system.duration if set */ + + char error[JJ_ERROR_TEXT_LENGTH]; /* On error, contains error description */ +}; + +/* Parse jobspec from json string `spec`, return resource request summary + * in `counts` on success. + * Returns 0 on success and -1 on failure with errno set and jj->error[] + * with an error message string. + */ + +int jj_get_counts (const char *spec, struct jj_counts *counts); + +/* Identical to jj_get_counts, but take json_t */ +int jj_get_counts_json (json_t *jobspec, struct jj_counts *counts); + +#endif /* !HAVE_JJ_H */ +} diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 36b5853c..3500ec8f 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -32,6 +32,8 @@ extern "C" { // custom bank_info class file #include "accounting.hpp" +// custom job resource counting file +#include "jj.hpp" // the plugin does not know about the association who submitted a job and will // assign default values to the association until it receives information from @@ -198,6 +200,42 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid) } +/* + * Using the jobspec from a job, increment the cur_nodes and cur_cores counts + * for an association. + */ +static int increment_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes + counts.nnodes; + b->cur_cores = b->cur_cores + (counts.nslots * counts.slot_size); + + return 0; +} + + +/* + * Using the jobspec from a job, decrement the cur_nodes and cur_cores counts + * for an association. + */ +static int decrement_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes - counts.nnodes; + b->cur_cores = b->cur_cores - (counts.nslots * counts.slot_size); + + return 0; +} + + /****************************************************************************** * * * Callbacks * @@ -883,6 +921,19 @@ static int run_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:o}", + "jobspec", &jobspec) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } b = static_cast (flux_jobtap_job_aux_get (p, @@ -897,8 +948,27 @@ static int run_cb (flux_plugin_t *p, return -1; } - // increment the user's current running jobs count + // increment the user's current running jobs and resources counts b->cur_run_jobs++; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to unpack " \ + "jobspec"); + return -1; + } else { + if (increment_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to increment " \ + "resource count"); + return -1; + } + } return 0; } @@ -1099,12 +1169,14 @@ static int inactive_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", - "userid", &userid) < 0) { + "{s:i, s:o}", + "userid", &userid, + "jobspec", &jobspec) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -1131,8 +1203,27 @@ static int inactive_cb (flux_plugin_t *p, return 0; // this job was running, so decrement the current running jobs count - // and look to see if any held jobs can be released + // and the resources count and look to see if any held jobs can be released b->cur_run_jobs--; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "unpack jobspec"); + return -1; + } else { + if (decrement_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "decrement resource count"); + return -1; + } + } // if the user/bank combo has any currently held jobs and the user is now // under their max jobs limit, remove the dependency from first held job diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index b444b184..7bb73414 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -56,7 +56,9 @@ void add_user_to_map ( a.projects, a.def_project, a.max_nodes, - a.max_cores + a.max_cores, + a.cur_nodes, + a.cur_cores }; } @@ -68,9 +70,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, 2147483647, 0, 0}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, 2147483647, 0, 0}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -272,7 +274,8 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, 2147483647, + 0, 0}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE"; diff --git a/t/Makefile.am b/t/Makefile.am index a84e27e9..97edba86 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -43,6 +43,7 @@ TESTSCRIPTS = \ t1041-view-jobs-by-project.t \ t1042-issue508.t \ t1043-view-jobs-by-bank.t \ + t1044-mf-priority-resource-limits.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1044-mf-priority-resource-limits.t b/t/t1044-mf-priority-resource-limits.t new file mode 100755 index 00000000..29ee970f --- /dev/null +++ b/t/t1044-mf-priority-resource-limits.t @@ -0,0 +1,94 @@ +#!/bin/bash + +test_description='track resources across running jobs per-association in priority plugin' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 4 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'allow guest access to testexec' ' + flux config load <<-EOF + [exec.testexec] + allow-guests = true + EOF +' + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add banks' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root A 1 +' + +test_expect_success 'add an association' ' + flux account add-user --username=user1 --userid=5001 --bank=A +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'submit 2 jobs that take up 1 node each; check resource counts' ' + job1=$(flux python ${SUBMIT_AS} 5001 -N1 sleep 60) && + flux job wait-event -f json ${job1} priority && + job2=$(flux python ${SUBMIT_AS} 5001 -N1 sleep 60) && + flux job wait-event -f json ${job2} priority && + flux jobtap query mf_priority.so > query.json && + test_debug "jq -S . query.json && + test_debug "jq -S . query.json && + test_debug "jq -S . query.json && + test_debug "jq -S .