Skip to content

Commit

Permalink
testing out idea for settings
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Mar 19, 2024
1 parent 6d21028 commit 2b6b674
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 95 deletions.
58 changes: 43 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,34 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say
## Usage

A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular
execution environment. They include:
execution environment.

### Steps

Steps include:

| Name | Description |
|--------|-------------|
| write | write a file in the staging directory |
| stage | stage a file across nodes |
| set | a step to define a global setting |
| copy | copy a file into staging (currently just local) |
| submit | submit the job |
| batch | submit the job with a batch command (more common in HPC) |
| auth | authenticate with some service |

Note that for the above, we assume a shared filesystem unless stage directs that this isn't the case.
These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed.

### Settings

Any "set" directive can be used to set a more global setting on the transform. For example:

- stage: defines the staging directory. If not set, will be a temporary directory that is created
- sharedfs: true or false to say that the filesystem is shared or not (defaults to false)

For `sharedfs` it would be ideal to have a setting that is specific to the transformer, but unfortunately this could be true or false
for flux, so it has to be set. But this might be an interesting compatibility thing to test.

### Example

This example will assume receiving a Jobspec on a flux cluster.
Expand Down Expand Up @@ -125,9 +141,6 @@ transform:
filename: install.sh
executable: true

- step: stage
filename: install.sh

- step: submit
filename: install.sh
wait: true
Expand All @@ -136,21 +149,36 @@ transform:
filename: job.sh
executable: true

- step: stage
filename: job.sh

- step: submit
filename: job.sh
wait: true
```
The above assumes we don't have a shared filesystem, and the receiving cluster has some cluster-specific method for staging or
file mapping. It could be ssh, or a filemap, or something else. For an ephemeral cluster API, it might be an interaction with
a storage provider, or just adding the file to an API call that will (in and of itself) do that creation, akin to a startup script for
an instance in Terraform. It really doesn't matter - the user can expect the file to be written and shared across nodes.
This is not intended to be a workflow or build tool - it simply is a transformational layer that a jobspec can provide
to setup a specific cluster environment. It works with a jobspec in that you define your filenames (scripts) in the tasks->scripts
directive. It also uses a plugin design, so a cluster or institution can write a custom transformer to install, and it will be discovered
The above assumes we have a shared filesystem, and by not setting the stage manually:
```yaml
- step: set
key: stage
value: /tmp/path-for-workflow
```
We will use a custom one. If we didn't have a shared filesystem we would need to provide that detail. It's really akin
to a subsystem detail, because a job that assumes a shared fs won't be compatible.
```yaml
- step: set
key: sharedfs
value: false
```
Whenever there is a copy (not shown) this assumes the receiving cluster has some cluster-specific method for copy or
file mapping, even in the case without a shared filesystem. It could be ssh, or a filemap, or something else.
For an ephemeral cluster API, it might be an interaction with a storage provider, or just adding the file to an API call that
will (in and of itself) do that creation, akin to a startup script for an instance in Terraform. It really doesn't matter -
the user can expect the file to be written and shared across nodes. This is not intended to be a workflow or build tool -
it simply is a transformational layer that a jobspec can provide to setup a specific cluster environment. It works with a
jobspec in that you define your filenames (scripts) in the tasks->scripts directive. It also uses a plugin design, so a
cluster or institution can write a custom transformer to install, and it will be discovered
by name. This is intended to work with the prototype [rainbow](https://github.com/converged-computing/rainbow) scheduler.
Jobspec is an entity of [flux-framework](https://flux-framework.org).
Expand Down
5 changes: 0 additions & 5 deletions examples/hello-world-jobspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ task:
filename: job.sh
executable: true

# This is only provided as an example - in the devcontainer it's just one physicap machine!
# and filemap I think will likely not work
# - step: stage
# filename: job.sh

- step: submit
filename: job.sh

Expand Down
6 changes: 1 addition & 5 deletions examples/hello-world-wait-jobspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ task:
filename: job.sh
executable: true

# This is only provided as an example - in the devcontainer it's just one physicap machine!
#- step: stage
# filename: install.sh

- step: submit
filename: job.sh
wait: true
watch: true

scripts:
- name: job.sh
Expand Down
2 changes: 2 additions & 0 deletions jobspec/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
valid_settings = {"sharedfs", "stage"}
sharedfs = True
39 changes: 37 additions & 2 deletions jobspec/transform.py → jobspec/runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import copy
import os
import sys

# This imports the latest version
import jobspec.core as js
import jobspec.defaults as defaults
import jobspec.utils as utils
from jobspec.logger import LogColors

Expand Down Expand Up @@ -35,27 +37,59 @@ def register_step(cls, step, name=None):
name = name or step.name
cls.steps[name] = step

def update_settings(self, settings, typ, step):
"""
Update settings, either set or unset
"""
if "key" not in step or "value" not in step:
return
if not step["key"]:
return
# This is important for typos, etc.
if step["key"] not in defaults.valid_settings:
raise ValueError(f"{step['key']} is not a known setting.")
if typ == "set":
settings[step["key"]] = step["value"]
elif typ == "unset" and step["key"] in settings:
del settings[step["key"]]

def parse(self, jobspec):
"""
parse validates transform logic and returns steps
parse validates transform logic and returns steps.
We also look for global variables for steps.
"""
# We will return a listing of steps to complete
# Each step is provided all settings that are provided
# before it
steps = []

# Each filename directive must have a matching script
# It could be the case it exists (and we might mark that)
# but not for the time being
task = jobspec.jobspec.get("task")

# Global set settings
settings = {"sharedfs": defaults.sharedfs}

# Validate each step
for i, step in enumerate(task.get("transform")):
# The step must be known to the transformer
name = step.get("step")
if not name:
raise ValueError(f"Step in index {i} is missing a name")

# If it's a set or unset, add to settings
if name == "set" or name == "unset":
self.update_settings(settings, name, step)
continue

if name not in self.steps:
raise ValueError(f"Step {name} is not known to transformer {self.name}")

# This ensures we get the exact state of settings at this level
step["settings"] = copy.deepcopy(settings)

# Instantiate the new step (does extra validation), provided entire jobspec
new_step = self.steps[name](jobspec.jobspec, step)
steps.append(new_step)
Expand All @@ -76,7 +110,8 @@ def run(self, filename):

# Run each step to submit the job, and that's it.
for step in steps:
self.run_step(step, stage)
step_stage = stage or step["settings"].get("stage")
self.run_step(step, step_stage)

def run_step(self, step, stage):
"""
Expand Down
103 changes: 36 additions & 67 deletions jobspec/transformer/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

import jobspec.steps as steps
import jobspec.utils as utils
from jobspec.runner import TransformerBase
from jobspec.steps.base import StepBase
from jobspec.transform import TransformerBase

logger = logging.getLogger("jobspec-flux")

Expand Down Expand Up @@ -44,7 +44,9 @@ def __init__(self, *args, **kwargs):

class stage(StepBase):
"""
A stage step uses flux filemap to stage across nodes
A copy step uses flux filemap to stage across nodes
This assumes we don't have a shared filesystem. It is skipped if we do.
"""

name = "stage"
Expand All @@ -53,6 +55,15 @@ def run(self, stage, *args, **kwargs):
"""
Run the stage step = fall back to filename for now
"""
# If we have a sharedfs, return early, the write will have written it there
sharedfs = self.options.get("settings", {}).get("sharedfs") is True
if sharedfs:
return

# Sanity check staging directory exists across nodes
cmd = ["flux", "exec", "-r", "all", "-x", "0", "mkdir", "-p", stage]
utils.run_command(cmd, check_output=True)

name = str(uuid.uuid4())
filename = self.options["filename"]
cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename]
Expand Down Expand Up @@ -99,51 +110,18 @@ def run(self, stage, *args, **kwargs):

# I don't think batch has python bindings?
filename = self.options.get("filename")
cmd = ["flux", "batch"]
cmd = ["flux", "batch", "--cwd", stage]
if nodes:
cmd += ["-N", str(nodes)]
if tasks:
cmd += ["-n", str(tasks)]
cmd.append(filename)

# Would be nice if this was exposed as "from jobspec"
# https://github.com/flux-framework/flux-core/blob/master/src/bindings/python/flux/cli/batch.py#L109-L120
with utils.workdir(stage):
res = utils.run_command(cmd, check_output=True)

# 👀️ 👀️ 👀️
jobid = res["message"].strip()
wait = self.options.get("wait") is True
if wait:
watch_job(handle, jobid)
return jobid


def watch_job(handle, jobid):
"""
Shared function to watch a job
"""
import flux.job

if isinstance(jobid, str):
jobid = flux.job.JobID(jobid)

print()
watcher = flux.job.watcher.JobWatcher(
handle,
progress=False,
jps=False, # show throughput with progress
log_events=False,
log_status=True,
labelio=False,
wait=True,
watch=True,
)
watcher.start()
watcher.add_jobid(jobid)
handle.reactor_run()


class submit(StepBase):
name = "submit"

Expand All @@ -157,42 +135,33 @@ def validate(self):

def run(self, stage, *args, **kwargs):
"""
Run the submit step
"""
import flux.job
Run the submit step.
# Parse jobspec into yaml stream, because it doesn't have support for json stream
# Also remove "experimental" feature lol
js = copy.deepcopy(self.jobspec)
for key in ["scripts", "transform", "resources"]:
if key in js.get("task"):
del js["task"][key]
The python bindings are giving me weird errors.
"""
slot = self.flatten_slot()
nodes = slot.get("node")
tasks = slot.get("core")

# Use the filename or fall back to command
# I don't think batch has python bindings?
filename = self.options.get("filename")

# Task -> tasks
if "task" in js:
task = js.get("task")
del js["task"]
js["tasks"] = [task]
if "command" not in task:
task["command"] = ["/bin/bash", filename]

# It requires attributes, even if it's empty...
if "attributes" not in js:
js["attributes"] = {"system": {"duration": 3600, "cwd": stage}}

# Are we watching or waiting (note that watching implies waiting?
cmd = ["flux", "submit", "--cwd", stage]
watch = self.options.get("watch") is True
wait = self.options.get("wait") is True or watch is True
flux_jobspec = flux.job.JobspecV1.from_yaml_stream(yaml.dump(js))
jobid = flux.job.submit(handle, flux_jobspec, waitable=wait)

# 👀️ 👀️ 👀️
if watch:
watch_job(handle, jobid)
return jobid.f58plain
cmd.append("--watch")
if nodes:
cmd += ["-N", str(nodes)]
if tasks:
cmd += ["-n", str(tasks)]
cmd += ["/bin/bash", filename]
print("\n" + " ".join(cmd))

with utils.workdir(stage):
res = utils.run_command(cmd, check_output=True, stream=watch)

if not watch:
jobid = res["message"].strip()
return jobid


# A transformer can register shared steps, or custom steps
Expand Down
5 changes: 4 additions & 1 deletion jobspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,8 @@ def run_command(cmd, stream=False, check_output=False, return_code=0):

# Check the output and raise an error if not success
if check_output and t[1] != return_code:
raise ValueError(output["message"].strip())
if output["message"]:
raise ValueError(output["message"].strip())
else:
raise ValueError(f"Failed execution, return code {t[1]}")
return output

0 comments on commit 2b6b674

Please sign in to comment.