Skip to content

Commit

Permalink
testing out idea for settings
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Mar 19, 2024
1 parent 6d21028 commit bd7d29a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 27 deletions.
58 changes: 43 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,34 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say
## Usage

A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular
execution environment. They include:
execution environment.

### Steps

Steps include:

| Name | Description |
|--------|-------------|
| write | write a file in the staging directory |
| stage | stage a file across nodes |
| set | a step to define a global setting |
| copy | copy a file into staging (currently just local) |
| submit | submit the job |
| batch | submit the job with a batch command (more common in HPC) |
| auth | authenticate with some service |

Note that for the above, we assume a shared filesystem unless stage directs that this isn't the case.
These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed.

### Settings

Any "set" directive can be used to set a more global setting on the transform. For example:

- stage: defines the staging directory. If not set, will be a temporary directory that is created
- sharedfs: true or false to say that the filesystem is shared or not (defaults to false)

For `sharedfs` it would be ideal to have a setting that is specific to the transformer, but unfortunately this could be true or false
for flux, so it has to be set. But this might be an interesting compatibility thing to test.

### Example

This example will assume receiving a Jobspec on a flux cluster.
Expand Down Expand Up @@ -125,9 +141,6 @@ transform:
filename: install.sh
executable: true

- step: stage
filename: install.sh

- step: submit
filename: install.sh
wait: true
Expand All @@ -136,21 +149,36 @@ transform:
filename: job.sh
executable: true

- step: stage
filename: job.sh

- step: submit
filename: job.sh
wait: true
```
The above assumes we don't have a shared filesystem, and the receiving cluster has some cluster-specific method for staging or
file mapping. It could be ssh, or a filemap, or something else. For an ephemeral cluster API, it might be an interaction with
a storage provider, or just adding the file to an API call that will (in and of itself) do that creation, akin to a startup script for
an instance in Terraform. It really doesn't matter - the user can expect the file to be written and shared across nodes.
This is not intended to be a workflow or build tool - it simply is a transformational layer that a jobspec can provide
to setup a specific cluster environment. It works with a jobspec in that you define your filenames (scripts) in the tasks->scripts
directive. It also uses a plugin design, so a cluster or institution can write a custom transformer to install, and it will be discovered
The above assumes we have a shared filesystem, and by not setting the stage manually:
```yaml
- step: set
key: stage
value: /tmp/path-for-workflow
```
We will use a custom one. If we didn't have a shared filesystem we would need to provide that detail. It's really akin
to a subsystem detail, because a job that assumes a shared fs won't be compatible.
```yaml
- step: set
key: sharedfs
value: false
```
Whenever there is a copy (not shown) this assumes the receiving cluster has some cluster-specific method for copy or
file mapping, even in the case without a shared filesystem. It could be ssh, or a filemap, or something else.
For an ephemeral cluster API, it might be an interaction with a storage provider, or just adding the file to an API call that
will (in and of itself) do that creation, akin to a startup script for an instance in Terraform. It really doesn't matter -
the user can expect the file to be written and shared across nodes. This is not intended to be a workflow or build tool -
it simply is a transformational layer that a jobspec can provide to setup a specific cluster environment. It works with a
jobspec in that you define your filenames (scripts) in the tasks->scripts directive. It also uses a plugin design, so a
cluster or institution can write a custom transformer to install, and it will be discovered
by name. This is intended to work with the prototype [rainbow](https://github.com/converged-computing/rainbow) scheduler.
Jobspec is an entity of [flux-framework](https://flux-framework.org).
Expand Down
5 changes: 0 additions & 5 deletions examples/hello-world-jobspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ task:
filename: job.sh
executable: true

# This is only provided as an example - in the devcontainer it's just one physicap machine!
# and filemap I think will likely not work
# - step: stage
# filename: job.sh

- step: submit
filename: job.sh

Expand Down
6 changes: 1 addition & 5 deletions examples/hello-world-wait-jobspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ task:
filename: job.sh
executable: true

# This is only provided as an example - in the devcontainer it's just one physicap machine!
#- step: stage
# filename: install.sh

- step: submit
filename: job.sh
wait: true
watch: true

scripts:
- name: job.sh
Expand Down
1 change: 1 addition & 0 deletions jobspec/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sharedfs = True
34 changes: 33 additions & 1 deletion jobspec/transform.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import copy
import os
import sys

# This imports the latest version
import jobspec.core as js
import jobspec.defaults as defaults
import jobspec.utils as utils
from jobspec.logger import LogColors

Expand Down Expand Up @@ -35,27 +37,56 @@ def register_step(cls, step, name=None):
name = name or step.name
cls.steps[name] = step

def update_settings(self, settings, typ, step):
"""
Update settings, either set or unset
"""
if "key" not in step or "value" not in step:
return
if not step["key"]:
return
if typ == "set":
settings[step["key"]] = step["value"]
elif typ == "unset" and step["key"] in settings:
del settings[step["key"]]

def parse(self, jobspec):
"""
parse validates transform logic and returns steps
parse validates transform logic and returns steps.
We also look for global variables for steps.
"""
# We will return a listing of steps to complete
# Each step is provided all settings that are provided
# before it
steps = []

# Each filename directive must have a matching script
# It could be the case it exists (and we might mark that)
# but not for the time being
task = jobspec.jobspec.get("task")

# Global set settings
settings = {"sharedfs": defaults.sharedfs}

# Validate each step
for i, step in enumerate(task.get("transform")):
# The step must be known to the transformer
name = step.get("step")
if not name:
raise ValueError(f"Step in index {i} is missing a name")

# If it's a set or unset, add to settings
if name == "set" or name == "unset":
self.update_settings(settings, name, step)
continue

if name not in self.steps:
raise ValueError(f"Step {name} is not known to transformer {self.name}")

# This ensures we get the exact state of settings at this level
step["settings"] = copy.deepcopy(settings)

# Instantiate the new step (does extra validation), provided entire jobspec
new_step = self.steps[name](jobspec.jobspec, step)
steps.append(new_step)
Expand All @@ -76,6 +107,7 @@ def run(self, filename):

# Run each step to submit the job, and that's it.
for step in steps:
step_stage = stage or step["settings"].get("stage")
self.run_step(step, stage)

def run_step(self, step, stage):
Expand Down
13 changes: 12 additions & 1 deletion jobspec/transformer/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def __init__(self, *args, **kwargs):

class stage(StepBase):
"""
A stage step uses flux filemap to stage across nodes
A copy step uses flux filemap to stage across nodes
This assumes we don't have a shared filesystem. It is skipped if we do.
"""

name = "stage"
Expand All @@ -53,6 +55,15 @@ def run(self, stage, *args, **kwargs):
"""
Run the stage step = fall back to filename for now
"""
# If we have a sharedfs, return early, the write will have written it there
sharedfs = self.options.get("settings", {}).get("sharedfs") is True
if sharedfs:
return

# Sanity check staging directory exists across nodes
cmd = ["mkdir", "-p", stage]
utils.run_command(cmd, check_output=True)

name = str(uuid.uuid4())
filename = self.options["filename"]
cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename]
Expand Down

0 comments on commit bd7d29a

Please sign in to comment.