Skip to content

Commit

Permalink
Merge pull request #561 from cmoussa1/plugin.track.resources
Browse files Browse the repository at this point in the history
plugin: track the resources used across all of an association's running jobs
  • Loading branch information
mergify[bot] authored Jan 13, 2025
2 parents 909529b + 4494d3c commit 0ffd06e
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ noinst_HEADERS = \
fairness/writer/data_writer_base.hpp \
fairness/writer/data_writer_db.hpp \
fairness/writer/data_writer_stdout.hpp \
plugins/accounting.hpp
plugins/accounting.hpp \
plugins/jj.hpp

fairness_libweighted_tree_la_SOURCES = \
fairness/account/account.cpp \
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
5 changes: 4 additions & 1 deletion src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ json_t* Association::to_json () const

// 'o' steals the reference for both held_job_ids and user_queues
json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i, s:o,"
" s:o, s:i, s:o, s:s, s:i, s:i, s:i}",
" s:o, s:i, s:o, s:s, s:i, s:i, s:i,"
" s:i, s:i}",
"bank_name", bank_name.c_str (),
"fairshare", fairshare,
"max_run_jobs", max_run_jobs,
Expand All @@ -100,6 +101,8 @@ json_t* Association::to_json () const
"def_project", def_project.c_str (),
"max_nodes", max_nodes,
"max_cores", max_cores,
"cur_nodes", cur_nodes,
"cur_cores", cur_cores,
"active", active);

if (!u)
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Association {
std::string def_project; // default project
int max_nodes; // max num nodes across all running jobs
int max_cores; // max num cores across all running jobs
int cur_nodes; // current number of used nodes
int cur_cores; // current number of used cores

// methods
json_t* to_json () const; // convert object to JSON string
Expand Down
165 changes: 165 additions & 0 deletions src/plugins/jj.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <errno.h>
#include <string.h>
#include <jansson.h>

#include "jj.hpp"

static int jj_read_level (json_t *o, int level, struct jj_counts *jj);

static int jj_read_vertex (json_t *o, int level, struct jj_counts *jj)
{
int count;
const char *type = NULL;
json_t *with = NULL;
json_error_t error;
int exclusive = 0;

if (json_unpack_ex (o, &error, 0, "{ s:s s:i s?b s?o }",
"type", &type,
"count", &count,
"exclusive", &exclusive,
"with", &with) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"level %d: %s", level, error.text);
errno = EINVAL;
return -1;
}
if (count <= 0) {
sprintf (jj->error, "Invalid count %d for type '%s'",
count, type);
errno = EINVAL;
return -1;
}
if (streq (type, "node")) {
jj->nnodes = count;
if (exclusive)
jj->exclusive = true;
}
else if (streq (type, "slot"))
jj->nslots = count;
else if (streq (type, "core"))
jj->slot_size = count;
else if (streq (type, "gpu"))
jj->slot_gpus = count;
else {
sprintf (jj->error, "Unsupported resource type '%s'", type);
errno = EINVAL;
return -1;
}
if (with)
return jj_read_level (with, level+1, jj);
return 0;

}

static int jj_read_level (json_t *o, int level, struct jj_counts *jj)
{
int i;
json_t *v = NULL;

if (!json_is_array (o)) {
snprintf (jj->error, sizeof (jj->error) - 1,
"level %d: must be an array", level);
errno = EINVAL;
return -1;
}
json_array_foreach (o, i, v) {
if (jj_read_vertex (v, level, jj) < 0)
return -1;
}
return 0;
}

int jj_get_counts (const char *spec, struct jj_counts *jj)
{
json_t *o = NULL;
json_error_t error;
int rc = -1;

if ((o = json_loads (spec, 0, &error)) == NULL) {
snprintf (jj->error, sizeof (jj->error) - 1,
"JSON load: %s", error.text);
errno = EINVAL;
return -1;
}

rc = jj_get_counts_json (o, jj);
json_decref (o);
return rc;
}

int jj_get_counts_json (json_t *jobspec, struct jj_counts *jj)
{
int version;
json_t *resources = NULL;
json_error_t error;

if (!jj) {
errno = EINVAL;
return -1;
}
memset (jj, 0, sizeof (*jj));

if (json_unpack_ex (jobspec, &error, 0, "{s:i s:o}",
"version", &version,
"resources", &resources) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"at top level: %s", error.text);
errno = EINVAL;
return -1;
}
if (version != 1) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Invalid version: expected 1, got %d", version);
errno = EINVAL;
return -1;
}
/* N.B. attributes.system is generally optional, but
* attributes.system.duration is required in jobspec version 1 */
if (json_unpack_ex (jobspec, &error, 0, "{s:{s:{s:F}}}",
"attributes",
"system",
"duration", &jj->duration) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"at top level: getting duration: %s", error.text);
errno = EINVAL;
return -1;
}
if (jj_read_level (resources, 0, jj) < 0)
return -1;

if (jj->nslots <= 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Unable to determine slot count");
errno = EINVAL;
return -1;
}
if (jj->slot_size <= 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Unable to determine slot size");
errno = EINVAL;
return -1;
}
if (jj->nnodes)
jj->nslots *= jj->nnodes;
return 0;
}

}
/* vi: ts=4 sw=4 expandtab
*/
62 changes: 62 additions & 0 deletions src/plugins/jj.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

extern "C" {
#ifndef HAVE_JJ_H
#define HAVE_JJ_H 1

#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <jansson.h>
#include <stdbool.h>

#define JJ_ERROR_TEXT_LENGTH 256

/**
* streq - Are two strings equal?
* @a: first string
* @b: first string
*
* This macro is arguably more readable than "!strcmp(a, b)".
*
* Example:
* if (streq(somestring, ""))
* printf("String is empty!\n");
*/
#define streq(a,b) (strcmp((a),(b)) == 0)

struct jj_counts {
int nnodes; /* total number of nodes requested */
int nslots; /* total number of slots requested */
int slot_size; /* number of cores per slot */
int slot_gpus; /* number of gpus per slot */

bool exclusive; /* enable node exclusive allocation if available */

double duration; /* attributes.system.duration if set */

char error[JJ_ERROR_TEXT_LENGTH]; /* On error, contains error description */
};

/* Parse jobspec from json string `spec`, return resource request summary
* in `counts` on success.
* Returns 0 on success and -1 on failure with errno set and jj->error[]
* with an error message string.
*/

int jj_get_counts (const char *spec, struct jj_counts *counts);

/* Identical to jj_get_counts, but take json_t */
int jj_get_counts_json (json_t *jobspec, struct jj_counts *counts);

#endif /* !HAVE_JJ_H */
}
Loading

0 comments on commit 0ffd06e

Please sign in to comment.