Skip to content

Commit

Permalink
traverser: add capability to output agfilter data
Browse files Browse the repository at this point in the history
Problem: after extracting the jobid and/or agfilter boolean with the
evaluator, the traverser needs to fetch the aggregate filter counts by
jobid (if the ID is present in the find string) or total utilization.

Add conditional behavior to fetch appropriate aggregate filter data
depending on the presence of jobid and the value of the agfilter
boolean.
  • Loading branch information
milroy committed Jan 10, 2025
1 parent 913e3f6 commit 548a543
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
35 changes: 33 additions & 2 deletions resource/traversers/dfu_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ int dfu_impl_t::dom_find_dfv (std::shared_ptr<match_writers_t> &w,
bool reserved = !(*m_graph)[u].schedule.reservations.empty ();
Flux::resource_model::vtx_predicates_override_t p_overridden = p;
p_overridden.set (down, allocated, reserved);
std::map<std::string, std::string> agfilter_data;
planner_multi_t *filter_plan = NULL;

(*m_graph)[u].idata.colors[dom] = m_color.gray ();
m_trav_level++;
Expand Down Expand Up @@ -833,15 +835,44 @@ int dfu_impl_t::dom_find_dfv (std::shared_ptr<match_writers_t> &w,
}
if (agfilter) {
// Check if there's a pruning (aggregate) filter initialized
if ((*m_graph)[u].idata.subplans[dom] == NULL)
if ((filter_plan = (*m_graph)[u].idata.subplans[dom]) == NULL)
goto done;
if (jobid == 0) { // jobid not specified; get totals
for (size_t i = 0; i < planner_multi_resources_len (filter_plan); ++i) {
int64_t total_resources = planner_multi_resource_total_at (filter_plan, i);
int64_t diff =
total_resources - planner_multi_avail_resources_at (filter_plan, 0, i);
std::string rtype = std::string (planner_multi_resource_type_at (filter_plan, i));
std::string fcounts =
"used:" + std::to_string (diff) + ", total:" + std::to_string (total_resources);
agfilter_data.insert ({rtype, fcounts});
}
} else { // get agfilter utilization for specified jobid
auto &job2span = (*m_graph)[u].idata.job2span;
auto span_it = job2span.find (jobid);
if (span_it == (*m_graph)[u].idata.job2span.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": span missing in job2span ";
m_err_msg += " for vertex: " + (*m_graph)[u].name + "\n";
goto done;
}
for (size_t i = 0; i < planner_multi_resources_len (filter_plan); ++i) {
int64_t total_resources = planner_multi_resource_total_at (filter_plan, i);
int64_t planned_resources =
planner_multi_span_planned_at (filter_plan, span_it->second, i);
std::string rtype = std::string (planner_multi_resource_type_at (filter_plan, i));
std::string fcounts = "used:" + std::to_string (planned_resources)
+ ", total:" + std::to_string (total_resources);
agfilter_data.insert ({rtype, fcounts});
}
}
}

// Need to clear out any stale data from the ephemeral object before
// emitting the vertex, since data could be leftover from previous
// traversals where the vertex was matched but not emitted
(*m_graph)[u].idata.ephemeral.check_and_clear_if_stale (m_best_k_cnt);
if ((rc = w->emit_vtx (level (), *m_graph, u, (*m_graph)[u].size, true)) < 0) {
if ((rc = w->emit_vtx (level (), *m_graph, u, (*m_graph)[u].size, agfilter_data, true)) < 0) {
m_err_msg += __FUNCTION__;
m_err_msg += std::string (": error from emit_vtx: ") + strerror (errno);
goto done;
Expand Down
3 changes: 2 additions & 1 deletion resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ int dfu_impl_t::emit_vtx (vtx_t u,
unsigned int needs,
bool exclusive)
{
return w->emit_vtx (level (), (*m_graph), u, needs, exclusive);
const std::map<std::string, std::string> agfilter_data;
return w->emit_vtx (level (), (*m_graph), u, needs, agfilter_data, exclusive);
}

int dfu_impl_t::emit_edg (edg_t e, std::shared_ptr<match_writers_t> &w)
Expand Down

0 comments on commit 548a543

Please sign in to comment.