Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add example for job info #229

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ help:
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
cp _build/html/_images/sphx_glr_example_job_submit_api_thumb.png _build/html/_images/sphx_glr_example_job_info_api_thumb.png
Binary file modified auto_examples/auto_examples_jupyter.zip
Binary file not shown.
Binary file modified auto_examples/auto_examples_python.zip
Binary file not shown.
259 changes: 259 additions & 0 deletions auto_examples/example_job_info_api.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Introductory example - Job Info API\n\nThis example will show how to get information\nabout a job after the fact.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import json\nimport time\nimport os\nimport flux\nfrom flux.job import JobspecV1\nimport subprocess"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here we instantiate a flux handle. This will connect to the running flux instance.\nIf you were running this on a cluster with Flux, you'd likely already be able to\nconnect. If you are testing out on your own, you might need to do flux start --test-size=4\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"handle = flux.Flux()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a new jobspec, or a recipe for a flux job. You'll notice we are providing a command\ndirectly, along with tasks, nodes, and cores per task. You could also provide a script here.\nIf we were doing this on the command line, it would be equivalent to what is generated by:\nflux submit --ntasks=4 --nodes=2 --cores-per-task=2 sleep 10\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"jobspec = JobspecV1.from_command(\n command=[\"hostname\"], num_tasks=1, num_nodes=1, cores_per_task=1\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is how we set the \"current working directory\" (cwd) for the job\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"jobspec.cwd = os.getcwd()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is how we set the job environment\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"jobspec.environment = dict(os.environ)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's submit the job! We will get the job id.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"jobid = flux.job.submit(handle, jobspec)\ntime.sleep(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's say we store that jobid somewhere how do we get info later?\nWe know that if we ran flux jobs -a on the command line, we'd see the job\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"res = subprocess.getoutput('flux jobs -a')\nprint(res)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And if you are an expert user, you know that you can see metadata for a job\nThis command, without a key, will show the keys available to you\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"res = subprocess.getoutput(f'flux job info {jobid} | true')\nprint(res)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And since the underlying logic here is pinging the flux KVS or key value store,\nwe can select one of those keys to view. For example, here is the jobspec\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"res = subprocess.getoutput(f'flux job info {jobid} jobspec')\nprint(res)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is great, but ideally we can get this metadata directly from Python.\nFirst, here is a way to get basic jobinfo. Given we start with a string jobid,\nwe will first want to parse it back into a Flux JobID, and then prepare\na payload to the Job List RPC to say \"give me all the attributes back\"\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fluxjob = flux.job.JobID(jobid)\npayload = {\"id\": fluxjob, \"attrs\": [\"all\"]}\nrpc = flux.job.list.JobListIdRPC(handle, \"job-list.list-id\", payload)\njobinfo = rpc.get_job()\nprint(json.dumps(jobinfo, indent=4))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can get less commonly used (and thus exposed) metadata like this\nsuch as the emoji state!\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"info = rpc.get_jobinfo()\nprint(info.__dict__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"But for either of the above approaches, we aren't getting anything back about our\noriginal jobspec! That's because we need to query the KVS for that. Notice here we\nhave metadata like the current working directory (cwd)\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"kvs = flux.job.job_kvs(handle, jobid)\njobspec = kvs.get('jobspec')\nprint(json.dumps(jobspec))\ntime.sleep(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, to watch (or stream) output, you can do the following.\nEach line here is a json structure that you can further parse.\nAs an example, if \"data\" is present as a key, this usually is output\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"for line in flux.job.event_watch(handle, jobid, \"guest.output\"):\n print(line)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
98 changes: 98 additions & 0 deletions auto_examples/example_job_info_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
"""
Introductory example - Job Info API
===================================

This example will show how to get information
about a job after the fact.
"""


import json
import time
import os
import flux
from flux.job import JobspecV1
import subprocess

#%%
# Here we instantiate a flux handle. This will connect to the running flux instance.
# If you were running this on a cluster with Flux, you'd likely already be able to
# connect. If you are testing out on your own, you might need to do flux start --test-size=4
handle = flux.Flux()

#%%
# This is a new jobspec, or a recipe for a flux job. You'll notice we are providing a command
# directly, along with tasks, nodes, and cores per task. You could also provide a script here.
# If we were doing this on the command line, it would be equivalent to what is generated by:
# flux submit --ntasks=4 --nodes=2 --cores-per-task=2 sleep 10
jobspec = JobspecV1.from_command(
command=["hostname"], num_tasks=1, num_nodes=1, cores_per_task=1
)

#%%
# This is how we set the "current working directory" (cwd) for the job
jobspec.cwd = os.getcwd()

#%%
# This is how we set the job environment
jobspec.environment = dict(os.environ)

#%%
# Let's submit the job! We will get the job id.
jobid = flux.job.submit(handle, jobspec)
time.sleep(2)

#%%
# Now let's say we store that jobid somewhere how do we get info later?
# We know that if we ran flux jobs -a on the command line, we'd see the job
res = subprocess.getoutput('flux jobs -a')
print(res)

#%%
# And if you are an expert user, you know that you can see metadata for a job
# This command, without a key, will show the keys available to you
res = subprocess.getoutput(f'flux job info {jobid} | true')
print(res)

#%%
# And since the underlying logic here is pinging the flux KVS or key value store,
# we can select one of those keys to view. For example, here is the jobspec
res = subprocess.getoutput(f'flux job info {jobid} jobspec')
print(res)

#%%
# This is great, but ideally we can get this metadata directly from Python.
# First, here is a way to get basic jobinfo. Given we start with a string jobid,
# we will first want to parse it back into a Flux JobID, and then prepare
# a payload to the Job List RPC to say "give me all the attributes back"
fluxjob = flux.job.JobID(jobid)
payload = {"id": fluxjob, "attrs": ["all"]}
rpc = flux.job.list.JobListIdRPC(handle, "job-list.list-id", payload)
jobinfo = rpc.get_job()
print(json.dumps(jobinfo, indent=4))

#%%
# You can get less commonly used (and thus exposed) metadata like this
# such as the emoji state!
info = rpc.get_jobinfo()
print(info.__dict__)

#%%
# But for either of the above approaches, we aren't getting anything back about our
# original jobspec! That's because we need to query the KVS for that. Notice here we
# have metadata like the current working directory (cwd)
kvs = flux.job.job_kvs(handle, jobid)
jobspec = kvs.get('jobspec')
print(json.dumps(jobspec))
time.sleep(2)

#%%
# Finally, to watch (or stream) output, you can do the following.
# Each line here is a json structure that you can further parse.
# As an example, if "data" is present as a key, this usually is output
for line in flux.job.event_watch(handle, jobid, "guest.output"):
print(line)



1 change: 1 addition & 0 deletions auto_examples/example_job_info_api.py.md5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
553b1d62e9c0adeedf1804adf3134c46
Loading