Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example: running from within python #5

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ jobs:
which jobspec
flux start jobspec run ./examples/hello-world-jobspec.yaml
flux start jobspec run ./examples/hello-world-batch.yaml
flux start python3 ./examples/flux/receive-job.py
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ execution environment. They include:

These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed.

### Steps

### Example

This example will assume receiving a Jobspec on a flux cluster.

#### 1. Start Flux

Start up the development environment to find yourself in a container with flux. Start a test instance:

```bash
Expand All @@ -53,6 +55,8 @@ Ensure you have jobspec installed! Yes, we are vscode, installing to the contain
sudo pip install -e .
```

#### 2. Command Line Examples

We are going to run the [examples/hello-world-jobspec.yaml](examples/hello-world-jobspec.yaml). This setup is way overly
complex for this because we don't actually need to do any staging or special work, but it's an example, so intended to be so.
Also note that the design of this file is subject to change. For example, we don't have to include the transform directly in the
Expand All @@ -77,6 +81,25 @@ jobspec run -t flux ./examples/hello-world-wait-jobspec.yaml
jobspec run --transformer flux ./examples/hello-world-wait-jobspec.yaml
```

#### 3. Python Examples

It could also be the case that you want something running inside a lead broker instance to receive Jobspecs incrementally and then
run them. This Python example can help with that by showing how to accomplish the same, but from within Python.

```bash
python3 ./examples/flux/receive-job.py
```
```console
$ python3 examples/flux/receive-job.py
=> step write OK
=> step submit f7aChzM3u OK
=> step write OK
=> step submit f7aDYuwMH OK
```

Just for fun (posterity) I briefly tried having emoji here:

![img/emoji.png](img/emoji.png)

### Details

Expand Down
54 changes: 54 additions & 0 deletions examples/flux/jobspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
version: 1
resources:
- count: 4
type: node
with:
- count: 1
label: fractal-descriptive
type: slot
with:
- count: 4
type: core

task:
transform:
- step: write
filename: install.sh
- step: submit
filename: install.sh
wait: true
- step: write
filename: job.sh
- step: submit
filename: job.sh

scripts:
- name: install.sh
content: |
#!/bin/bash
echo "This is an install step"

- name: job.sh
content: |
#!/bin/bash
echo "This is task ${FLUX_TASK_RANK}"
if [[ "${FLUX_TASK_RANK}" == "0" ]]; then
echo "Hello I am the leader: $(hostname)"
else
echo "Hello I am a follower: $(hostname)"
fi

count:
per_slot: 1
resources:
hardware:
hardware.gpu.available: 'no'
software:
go: "1.20"
io.archspec:
cpu.target: amd64
os:
os.name: Ubuntu 22.04.3 LTS
os.release: 22.04.3
os.vendor: ubuntu
slot: fractal-descriptive
22 changes: 22 additions & 0 deletions examples/flux/receive-job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
from jobspec.plugin import get_transformer_registry

here = os.path.abspath(os.path.dirname(__file__))
jobspec_file = os.path.join(here, "jobspec.yaml")

def main():

# Get the registry
registry = get_transformer_registry()

# The cool thing about transformers is that you can have
# one tiny server that acts an an interface to several cloud (or other)
# APIs. A transformer doesn't have to be for cluster batch, it could
# be for an API to an emphemeral resource
plugin = registry.get_plugin("flux")()

# Run the plugin with the jobspec
plugin.run(jobspec_file)

if __name__ == "__main__":
main()
Binary file added img/emoji.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions jobspec/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def run_step(self, step, stage):
prefix = f"step {step.name}".ljust(15)
print(f"=> {LogColors.OKCYAN}{prefix}{LogColors.ENDC}", end="")
try:
step.run(stage)
print(f"{LogColors.OKGREEN} OK{LogColors.ENDC}")
out = (step.run(stage) or "").ljust(25)
print(f"{LogColors.OKBLUE}{out}{LogColors.ENDC} {LogColors.OKGREEN}OK{LogColors.ENDC}")
except Exception as e:
print(f"\n{LogColors.RED}{str(e)}{LogColors.ENDC}")
sys.exit()
Expand Down
14 changes: 10 additions & 4 deletions jobspec/transformer/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,29 @@ def run(self, stage, *args, **kwargs):
if key in js.get("task"):
del js["task"][key]

# Use the filename or fall back to command
filename = self.options.get("filename")

# Task -> tasks
if "task" in js:
task = js.get("task")
del js["task"]
js["tasks"] = [task]
if "command" not in task:
task["command"] = ["/bin/bash", filename]

# It requires attributes, even if it's empty...
if "attributes" not in js:
js["attributes"] = {"system": {"duration": 3600, "cwd": stage}}

# Are we watching?
wait = self.options.get("wait") is True
# Are we watching or waiting (note that watching implies waiting?
watch = self.options.get("watch") is True
wait = self.options.get("wait") is True or watch is True
flux_jobspec = flux.job.JobspecV1.from_yaml_stream(yaml.dump(js))
jobid = flux.job.submit(handle, flux_jobspec, waitable=True)
jobid = flux.job.submit(handle, flux_jobspec, waitable=wait)

# 👀️ 👀️ 👀️
if wait:
if watch:
watch_job(handle, jobid)
return jobid.f58plain

Expand Down
2 changes: 1 addition & 1 deletion jobspec/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.1"
__version__ = "0.0.11"
AUTHOR = "Vanessa Sochat"
AUTHOR_EMAIL = "[email protected]"
NAME = "jobspec"
Expand Down