-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Problem: We do not have good examples for replicating flux job info in Python Solution: Add an interactive demo Signed-off-by: vsoch <[email protected]>
- Loading branch information
Showing
12 changed files
with
869 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"\n# Introductory example - Job Info API\n\nThis example will show how to get information\nabout a job after the fact.\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"import json\nimport time\nimport os\nimport flux\nfrom flux.job import JobspecV1\nimport subprocess" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Here we instantiate a flux handle. This will connect to the running flux instance.\nIf you were running this on a cluster with Flux, you'd likely already be able to\nconnect. If you are testing out on your own, you might need to do flux start --test-size=4\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"handle = flux.Flux()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"This is a new jobspec, or a recipe for a flux job. You'll notice we are providing a command\ndirectly, along with tasks, nodes, and cores per task. You could also provide a script here.\nIf we were doing this on the command line, it would be equivalent to what is generated by:\nflux submit --ntasks=4 --nodes=2 --cores-per-task=2 sleep 10\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"jobspec = JobspecV1.from_command(\n command=[\"hostname\"], num_tasks=1, num_nodes=1, cores_per_task=1\n)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"This is how we set the \"current working directory\" (cwd) for the job\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"jobspec.cwd = os.getcwd()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"This is how we set the job environment\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"jobspec.environment = dict(os.environ)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Let's submit the job! We will get the job id.\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"jobid = flux.job.submit(handle, jobspec)\ntime.sleep(2)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Now let's say we store that jobid somewhere how do we get info later?\nWe know that if we ran flux jobs -a on the command line, we'd see the job\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"res = subprocess.getoutput('flux jobs -a')\nprint(res)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"And if you are an expert user, you know that you can see metadata for a job\nThis command, without a key, will show the keys available to you\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"res = subprocess.getoutput(f'flux job info {jobid} | true')\nprint(res)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"And since the underlying logic here is pinging the flux KVS or key value store,\nwe can select one of those keys to view. For example, here is the jobspec\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"res = subprocess.getoutput(f'flux job info {jobid} jobspec')\nprint(res)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"This is great, but ideally we can get this metadata directly from Python.\nFirst, here is a way to get basic jobinfo. Given we start with a string jobid,\nwe will first want to parse it back into a Flux JobID, and then prepare\na payload to the Job List RPC to say \"give me all the attributes back\"\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"fluxjob = flux.job.JobID(jobid)\npayload = {\"id\": fluxjob, \"attrs\": [\"all\"]}\nrpc = flux.job.list.JobListIdRPC(handle, \"job-list.list-id\", payload)\njobinfo = rpc.get_job()\nprint(json.dumps(jobinfo, indent=4))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"You can get less commonly used (and thus exposed) metadata like this\nsuch as the emoji state!\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"info = rpc.get_jobinfo()\nprint(info.__dict__)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"But for either of the above approaches, we aren't getting anything back about our\noriginal jobspec! That's because we need to query the KVS for that. Notice here we\nhave metadata like the current working directory (cwd)\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"kvs = flux.job.job_kvs(handle, jobid)\njobspec = kvs.get('jobspec')\nprint(json.dumps(jobspec))\ntime.sleep(2)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Finally, to watch (or stream) output, you can do the following.\nEach line here is a json structure that you can further parse.\nAs an example, if \"data\" is present as a key, this usually is output\n\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"collapsed": false | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"for line in flux.job.event_watch(handle, jobid, \"guest.output\"):\n print(line)" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.8.10" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 0 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Introductory example - Job Info API | ||
=================================== | ||
This example will show how to get information | ||
about a job after the fact. | ||
""" | ||
|
||
|
||
import json | ||
import time | ||
import os | ||
import flux | ||
from flux.job import JobspecV1 | ||
import subprocess | ||
|
||
#%% | ||
# Here we instantiate a flux handle. This will connect to the running flux instance. | ||
# If you were running this on a cluster with Flux, you'd likely already be able to | ||
# connect. If you are testing out on your own, you might need to do flux start --test-size=4 | ||
handle = flux.Flux() | ||
|
||
#%% | ||
# This is a new jobspec, or a recipe for a flux job. You'll notice we are providing a command | ||
# directly, along with tasks, nodes, and cores per task. You could also provide a script here. | ||
# If we were doing this on the command line, it would be equivalent to what is generated by: | ||
# flux submit --ntasks=4 --nodes=2 --cores-per-task=2 sleep 10 | ||
jobspec = JobspecV1.from_command( | ||
command=["hostname"], num_tasks=1, num_nodes=1, cores_per_task=1 | ||
) | ||
|
||
#%% | ||
# This is how we set the "current working directory" (cwd) for the job | ||
jobspec.cwd = os.getcwd() | ||
|
||
#%% | ||
# This is how we set the job environment | ||
jobspec.environment = dict(os.environ) | ||
|
||
#%% | ||
# Let's submit the job! We will get the job id. | ||
jobid = flux.job.submit(handle, jobspec) | ||
time.sleep(2) | ||
|
||
#%% | ||
# Now let's say we store that jobid somewhere how do we get info later? | ||
# We know that if we ran flux jobs -a on the command line, we'd see the job | ||
res = subprocess.getoutput('flux jobs -a') | ||
print(res) | ||
|
||
#%% | ||
# And if you are an expert user, you know that you can see metadata for a job | ||
# This command, without a key, will show the keys available to you | ||
res = subprocess.getoutput(f'flux job info {jobid} | true') | ||
print(res) | ||
|
||
#%% | ||
# And since the underlying logic here is pinging the flux KVS or key value store, | ||
# we can select one of those keys to view. For example, here is the jobspec | ||
res = subprocess.getoutput(f'flux job info {jobid} jobspec') | ||
print(res) | ||
|
||
#%% | ||
# This is great, but ideally we can get this metadata directly from Python. | ||
# First, here is a way to get basic jobinfo. Given we start with a string jobid, | ||
# we will first want to parse it back into a Flux JobID, and then prepare | ||
# a payload to the Job List RPC to say "give me all the attributes back" | ||
fluxjob = flux.job.JobID(jobid) | ||
payload = {"id": fluxjob, "attrs": ["all"]} | ||
rpc = flux.job.list.JobListIdRPC(handle, "job-list.list-id", payload) | ||
jobinfo = rpc.get_job() | ||
print(json.dumps(jobinfo, indent=4)) | ||
|
||
#%% | ||
# You can get less commonly used (and thus exposed) metadata like this | ||
# such as the emoji state! | ||
info = rpc.get_jobinfo() | ||
print(info.__dict__) | ||
|
||
#%% | ||
# But for either of the above approaches, we aren't getting anything back about our | ||
# original jobspec! That's because we need to query the KVS for that. Notice here we | ||
# have metadata like the current working directory (cwd) | ||
kvs = flux.job.job_kvs(handle, jobid) | ||
jobspec = kvs.get('jobspec') | ||
print(json.dumps(jobspec)) | ||
time.sleep(2) | ||
|
||
#%% | ||
# Finally, to watch (or stream) output, you can do the following. | ||
# Each line here is a json structure that you can further parse. | ||
# As an example, if "data" is present as a key, this usually is output | ||
for line in flux.job.event_watch(handle, jobid, "guest.output"): | ||
print(line) | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
553b1d62e9c0adeedf1804adf3134c46 |
Oops, something went wrong.