Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: support convenience API for job-info.lookup RPC / "flux job info" #5265

Merged
merged 5 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bindings/python/flux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ nobase_fluxpy_PYTHON = \
job/kill.py \
job/kvs.py \
job/list.py \
job/kvslookup.py \
job/info.py \
job/wait.py \
job/submit.py \
Expand Down
1 change: 1 addition & 0 deletions src/bindings/python/flux/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from flux.job.submit import submit_async, submit, submit_get_id
from flux.job.info import JobInfo, JobInfoFormat, job_fields_to_attrs
from flux.job.list import job_list, job_list_inactive, job_list_id, JobList, get_job
from flux.job.kvslookup import job_info_lookup, JobKVSLookup, job_kvs_lookup
from flux.job.wait import wait_async, wait, wait_get_status, result_async, result
from flux.job.event import (
event_watch_async,
Expand Down
171 changes: 171 additions & 0 deletions src/bindings/python/flux/job/kvslookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
###############################################################
# Copyright 2023 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import errno
import json

from flux.future import WaitAllFuture
from flux.job import JobID
from flux.rpc import RPC


# a few keys are special, decode them into dicts if you can
def decode_special_metadata(metadata):
for key in ("jobspec", "R"):
if key in metadata:
try:
tmp = json.loads(metadata[key])
metadata[key] = tmp
except json.decoder.JSONDecodeError:
# Ignore if can't be decoded
pass


class JobInfoLookupRPC(RPC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.jobid = None

def get(self):
return super().get()

def get_decode(self):
metadata = super().get()
decode_special_metadata(metadata)
return metadata


def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
payload = {"id": int(jobid), "keys": keys, "flags": 0}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
rpc.jobid = jobid
return rpc


# jobs_kvs_lookup simple variant for one jobid
def job_kvs_lookup(flux_handle, jobid, keys=["jobspec"], decode=True):
"""
Lookup job kvs data based on a jobid

:flux_handle: A Flux handle obtained from flux.Flux()
:jobid: jobid to lookup info for
:keys: Optional list of keys to fetch. (default is "jobspec")
:decode: Optional flag to decode special data into Python data structures
currently decodes "jobspec" and "R" into dicts
(default True)
"""
payload = {"id": int(jobid), "keys": keys, "flags": 0}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
try:
if decode:
rsp = rpc.get_decode()
else:
rsp = rpc.get()
# The job does not exist!
except FileNotFoundError:
return None
return rsp


class JobKVSLookupFuture(WaitAllFuture):
"""Wrapper Future for multiple jobids"""

def __init__(self):
super(JobKVSLookupFuture, self).__init__()
self.errors = []

def _get(self, decode=True):
jobs = []
# Wait for all RPCs to complete
self.wait_for()

# Get all successful jobs, accumulate errors in self.errors
for child in self.children:
try:
if decode:
rsp = child.get_decode()
else:
rsp = child.get()
jobs.append(rsp)
except EnvironmentError as err:
if err.errno == errno.ENOENT:
msg = f"JobID {child.jobid.orig} unknown"
else:
msg = f"rpc: {err.strerror}"
self.errors.append(msg)
return jobs

def get(self):
"""get all successful results, appending errors into self.errors"""
return self._get(False)

def get_decode(self):
"""
get all successful results, appending errors into self.errors. Decode
special data into Python data structures
"""
return self._get(True)


class JobKVSLookup:
"""User friendly class to lookup job KVS data

:flux_handle: A Flux handle obtained from flux.Flux()
:ids: List of jobids to get data for
:keys: Optional list of keys to fetch. (default is "jobspec")
:decode: Optional flag to decode special data into Python data structures
currently decodes "jobspec" and "R" into dicts
(default True)
"""

def __init__(
self,
flux_handle,
ids=[],
keys=["jobspec"],
decode=True,
):
self.handle = flux_handle
self.keys = list(keys)
self.ids = list(map(JobID, ids)) if ids else []
self.decode = decode
self.errors = []

def fetch_data(self):
"""Initiate the job info lookup to the Flux job-info module

JobKVSLookup.fetch_data() returns a JobKVSLookupFuture,
which will be fulfilled when the job data is available.

Once the Future has been fulfilled, a list of objects
can be obtained via JobKVSLookup.data(). If
JobKVSLookupFuture.errors is non-empty, then it will contain a
list of errors returned via the query.
"""
listids = JobKVSLookupFuture()
for jobid in self.ids:
listids.push(job_info_lookup(self.handle, jobid, self.keys))
return listids

def data(self):
"""Synchronously fetch a list of data responses

If the Future object returned by JobKVSLookup.fetch_data has
not yet been fulfilled (e.g. is_ready() returns False), then this call
may block. Otherwise, returns a list of responses for all job ids
returned.
"""
rpc = self.fetch_data()
if self.decode:
data = rpc.get_decode()
else:
data = rpc.get()
if hasattr(rpc, "errors"):
self.errors = rpc.errors
return data
7 changes: 7 additions & 0 deletions src/modules/job-info/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
size_t index;
json_t *key;
json_t *o = NULL;
json_t *tmp = NULL;
char *data = NULL;

if (!l->allow) {
Expand All @@ -184,6 +185,12 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
if (!(o = json_object ()))
goto enomem;

tmp = json_integer (l->id);
if (json_object_set_new (o, "id", tmp) < 0) {
json_decref (tmp);
goto enomem;
}

json_array_foreach(l->keys, index, key) {
flux_future_t *f;
const char *keystr;
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ TESTSCRIPTS = \
python/t0010-job.py \
python/t0012-futures.py \
python/t0013-job-list.py \
python/t0014-job-kvslookup.py \
python/t0020-hostlist.py \
python/t0021-idset.py \
python/t0022-resource-set.py \
Expand Down
20 changes: 10 additions & 10 deletions t/python/t0010-job.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,8 @@ def cancel_on_start(future, jobid):
def test_33_get_job(self):
self.sleep_jobspec = JobspecV1.from_command(["sleep", "5"])
jobid = job.submit(self.fh, self.sleep_jobspec)
meta = job.get_job(self.fh, jobid)
self.assertIsInstance(meta, dict)
info = job.get_job(self.fh, jobid)
self.assertIsInstance(info, dict)
for key in [
"id",
"userid",
Expand All @@ -712,17 +712,17 @@ def test_33_get_job(self):
"nodelist",
"exception",
]:
self.assertIn(key, meta)
self.assertIn(key, info)

self.assertEqual(meta["id"], jobid)
self.assertEqual(meta["name"], "sleep")
self.assertTrue(meta["state"] in ["SCHED", "DEPEND", "RUN"])
self.assertEqual(meta["ntasks"], 1)
self.assertEqual(meta["ncores"], 1)
self.assertEqual(info["id"], jobid)
self.assertEqual(info["name"], "sleep")
self.assertTrue(info["state"] in ["SCHED", "DEPEND", "RUN"])
self.assertEqual(info["ntasks"], 1)
self.assertEqual(info["ncores"], 1)

# Test a job that does not exist
meta = job.get_job(self.fh, 123456)
self.assertIsNone(meta)
info = job.get_job(self.fh, 123456)
self.assertIsNone(info)

def test_34_timeleft(self):
spec = JobspecV1.from_command(
Expand Down
Loading