Skip to content

Commit

Permalink
Merge pull request #2 from compspec/add-batch-support
Browse files Browse the repository at this point in the history
feat: support for batch and stage
  • Loading branch information
vsoch authored Mar 18, 2024
2 parents cef40e3 + f6f2cf1 commit dfab7eb
Show file tree
Hide file tree
Showing 20 changed files with 312 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ build
dist
env
.eggs
__pycache__
67 changes: 60 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say

## Usage

A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular
execution environment. They include:

| Name | Description |
|--------|-------------|
| write | write a file in the staging directory |
| stage | stage a file across nodes |
| submit | submit the job |
| batch | submit the job with a batch command (more common in HPC) |
| auth | authenticate with some service |

These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed.

### Steps

### Example

Start up the development environment to find yourself in a container with flux. Start a test instance:
Expand Down Expand Up @@ -45,9 +60,24 @@ jobspec - it can be a file that the jobspec writes, and then the command is issu
it there for the time being, mostly because it looks nicer. I'm sure someone will disagree with me about that.

```bash
# Example showing without watching (waiting) and showing output
jobspec run ./examples/hello-world-jobspec.yaml

# Example that shows waiting for output
jobspec run ./examples/hello-world-wait-jobspec.yaml

# Example with batch using flux
jobspec run ./examples/hello-world-batch.yaml
```

Note that the default transformer is flux, so the above are equivalent to:

```bash
jobspec run -t flux ./examples/hello-world-wait-jobspec.yaml
jobspec run --transformer flux ./examples/hello-world-wait-jobspec.yaml
```


### Details

As an example, although you *could* submit a job with a command ready to go - assuming your cluster has the
Expand Down Expand Up @@ -105,7 +135,9 @@ Jobspec is an entity of [flux-framework](https://flux-framework.org).
#### Why not rely on Flux internals?
We want a Jobspec to be able to handle a transformation of some logic (the above) into an execution that might not involve flux at all. It could be another workload manager (e.g., Slurm) or it could be a service that submits to some cloud batch API.
If we lived in a universe of just flux, sure we wouldn't need this. But the world is more than Flux, and we want to extend our Jobspec to that world.
So we want a Jobspec to be able to handle a transformation of some logic (the above) into an execution that might not involve flux at all. It could be another workload manager (e.g., Slurm),
Kubernetes, or it could be a service that submits to some cloud batch API.
#### What are all the steps allowed?
Expand All @@ -130,12 +162,6 @@ There are several likely means of interacting with this library:
For the example usage here, and since the project I am working on is concerned with Flux, we will start with the simplest case - a client that is running inside a flux instance (meaning it can import flux) that reads in a jobspec with a section that defines a set of transforms, and then issues the commands to stage the setup and use flux to run the work defined by the jobspec.
## TODO
- write the hello world example with flux
- add the staging example
- write the same, but using batch
## Developer
### Organization
Expand All @@ -144,6 +170,33 @@ While you can write an external transformer (as a plugin) a set of core transfor
- [jobspec/transformer](jobspec/transformer): core transformer classes that ship internally here.
### Writing a Transformer
For now, the easiest thing to do is add a single file (named by your transformer) to [jobspec/transformer](jobspec/transformer)
and copy the precedence in the file. A transformer minimally is a class with a name, description, and some number of steps.
You can then use provided steps in [jobspec/steps](jobstep/steps) or use the `StepBase` to write your own. At the end of
your transformer file you simply need to register the steps you want to use:

```python
# A transformer can register shared steps, or custom steps
Transformer.register_step(steps.WriterStep)
Transformer.register_step(batch)
Transformer.register_step(submit)
Transformer.register_step(stage)
```

If there is a skip you want the user to be able to define (but skip it for your transformer, for whatever reason you might have)
just register the empty step with the name you want to skip. As an example, let's say my transforer has no concept of a stage
(sharing a file across separate nodes) given that it has a shared filesystem. I might want to do:

```python
import jobspec.steps as steps
# This will not fail validation that the step is unknowb, but skip it
Transformer.register_step(steps.EmptyStep, name="stage")
```


## License

HPCIC DevTools is distributed under the terms of the MIT license.
Expand Down
37 changes: 37 additions & 0 deletions examples/hello-world-batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# TODO test these out on on x86, then create arm specs
version: 1
resources:
- count: 4
type: node
with:
- count: 1
label: hello-world
type: slot
with:
- count: 4
type: core

task:
transform:
- step: write
filename: batch.sh
executable: true

- step: batch
filename: batch.sh
# wait: true

scripts:
- name: batch.sh
content: |
#!/bin/bash
flux submit -N 1 --watch echo what is the meaning
flux submit -N 1 --watch echo of all of this
count:
per_slot: 1
resources:
hardware:
hardware.gpu.available: 'no'
io.archspec:
cpu.target: amd64
slot: hello-world
5 changes: 3 additions & 2 deletions examples/hello-world-jobspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ task:
executable: true

# This is only provided as an example - in the devcontainer it's just one physicap machine!
#- step: stage
# filename: install.sh
# and filemap I think will likely not work
# - step: stage
# filename: job.sh

- step: submit
filename: job.sh
Expand Down
Binary file removed jobspec/cli/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file removed jobspec/cli/__pycache__/run.cpython-310.pyc
Binary file not shown.
Binary file removed jobspec/plugin/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file removed jobspec/plugin/__pycache__/registry.cpython-310.pyc
Binary file not shown.
1 change: 1 addition & 0 deletions jobspec/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .empty import EmptyStep
from .fileio import WriterStep
Binary file removed jobspec/steps/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file removed jobspec/steps/__pycache__/base.cpython-310.pyc
Binary file not shown.
Binary file removed jobspec/steps/__pycache__/fileio.cpython-310.pyc
Binary file not shown.
21 changes: 21 additions & 0 deletions jobspec/steps/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import jobspec.steps.helpers as helpers


class StepBase:
"""
A base step describes the design of a step.
Expand Down Expand Up @@ -32,8 +35,26 @@ def _validate(self):
raise ValueError(f"Step {self.name} has undefined field {field}")

def validate(self):
"""
Validate the step
"""
pass

def flatten_slot(self):
"""
Find the task slot, flatten it, and return
"""
slot = self.jobspec["task"]["slot"]
resources = self.jobspec.get("resources", [])

# Traverse each section. There is usually only one I guess
for resource in resources:
flat = {}
if helpers.find_resources(flat, resource, slot):
break

return flat

@property
def scripts(self):
"""
Expand Down
18 changes: 18 additions & 0 deletions jobspec/steps/empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import stat

import jobspec.utils as utils

from .base import StepBase


class EmptyStep(StepBase):
"""
An empty step is used to declare that a step should be skipped
"""

def run(self, stage, *args, **kwargs):
"""
do nothing.
"""
pass
21 changes: 21 additions & 0 deletions jobspec/steps/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
def find_resources(flat, resource, slot, last_one=False):
"""
Unwrap a nested resource
"""
# We found a dominant subsystem resource
if "type" in resource and resource["type"] != "slot":
flat[resource["type"]] = resource["count"]

# The previous was the found slot, return
if last_one:
return True

# We found the slot, this is where we stop
if "type" in resource and resource["type"] == "slot":
last_one = True

# More traversing...
if "with" in resource:
for r in resource["with"]:
find_resources(flat, r, slot, last_one)
return flat
8 changes: 6 additions & 2 deletions jobspec/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ def __init__(self, **options):
setattr(self, key, value)

@classmethod
def register_step(cls, step):
def register_step(cls, step, name=None):
"""
Register a step class to the transformer
"""
cls.steps[step.name] = step
# Allow registering an empty step if needed
# An empty step does nothing, an explicit declaration
# by the transformer developer it's not needed, etc.
name = name or step.name
cls.steps[name] = step

def parse(self, jobspec):
"""
Expand Down
Binary file not shown.
Binary file removed jobspec/transformer/__pycache__/flux.cpython-310.pyc
Binary file not shown.
120 changes: 106 additions & 14 deletions jobspec/transformer/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import shlex
import subprocess
import time
import uuid

import yaml

Expand Down Expand Up @@ -41,6 +42,108 @@ def __init__(self, *args, **kwargs):
# Custom Flux steps - just write and register!


class stage(StepBase):
"""
A stage step uses flux filemap to stage across nodes
"""

name = "stage"

def run(self, stage, *args, **kwargs):
"""
Run the stage step = fall back to filename for now
"""
name = str(uuid.uuid4())
filename = self.options["filename"]
cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename]
utils.run_command(cmd, check_output=True)

# Assume we send to all ranks besides where we've already written it
# This will likely fail if the filesystem is shared
cmd = [
"flux",
"exec",
"--dir",
stage,
"-r",
"all",
"-x",
"0",
"flux",
"filemap",
"get",
"--tags",
name,
]
utils.run_command(cmd, check_output=False)

# Unmap to clear the memory map
cmd = ["flux", "filemap", "unmap", "--tags", name]
utils.run_command(cmd, check_output=True)


class batch(StepBase):
name = "batch"

def run(self, stage, *args, **kwargs):
"""
Run the batch step
"""
slot = self.flatten_slot()
nodes = slot.get("node")
tasks = slot.get("core")

# I'm pretty sure we need one of these
if not nodes and not tasks:
raise ValueError("slot is missing node or core, cannot direct to batch.")

# I don't think batch has python bindings?
filename = self.options.get("filename")
cmd = ["flux", "batch"]
if nodes:
cmd += ["-N", str(nodes)]
if tasks:
cmd += ["-n", str(tasks)]
cmd.append(filename)

# Would be nice if this was exposed as "from jobspec"
# https://github.com/flux-framework/flux-core/blob/master/src/bindings/python/flux/cli/batch.py#L109-L120
with utils.workdir(stage):
res = utils.run_command(cmd, check_output=True)

# 👀️ 👀️ 👀️
jobid = res["message"].strip()
wait = self.options.get("wait") is True
if wait:
watch_job(handle, jobid)
return jobid


def watch_job(handle, jobid):
"""
Shared function to watch a job
"""
import flux.job

if isinstance(jobid, str):
jobid = flux.job.JobID(jobid)

print()
watcher = flux.job.watcher.JobWatcher(
handle,
progress=False,
jps=False, # show throughput with progress
log_events=False,
log_status=True,
labelio=False,
wait=True,
watch=True,
)
watcher.start()
watcher.add_jobid(jobid)
handle.reactor_run()


class submit(StepBase):
name = "submit"

Expand Down Expand Up @@ -82,23 +185,12 @@ def run(self, stage, *args, **kwargs):

# 👀️ 👀️ 👀️
if wait:
print()
watcher = flux.job.watcher.JobWatcher(
handle,
progress=False,
jps=False, # show throughput with progress
log_events=False,
log_status=True,
labelio=False,
wait=True,
watch=True,
)
watcher.start()
watcher.add_jobid(jobid)
handle.reactor_run()
watch_job(handle, jobid)
return jobid.f58plain


# A transformer can register shared steps, or custom steps
Transformer.register_step(steps.WriterStep)
Transformer.register_step(batch)
Transformer.register_step(submit)
Transformer.register_step(stage)
Loading

0 comments on commit dfab7eb

Please sign in to comment.