From bd7d29a547cb43a3ab171ef094b629c62988a16f Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 18 Mar 2024 20:39:18 -0600 Subject: [PATCH] testing out idea for settings Signed-off-by: vsoch --- README.md | 58 +++++++++++++++++++------- examples/hello-world-jobspec.yaml | 5 --- examples/hello-world-wait-jobspec.yaml | 6 +-- jobspec/defaults.py | 1 + jobspec/transform.py | 34 ++++++++++++++- jobspec/transformer/flux.py | 13 +++++- 6 files changed, 90 insertions(+), 27 deletions(-) create mode 100644 jobspec/defaults.py diff --git a/README.md b/README.md index a151861..d9fca57 100644 --- a/README.md +++ b/README.md @@ -13,18 +13,34 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say ## Usage A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular -execution environment. They include: +execution environment. + +### Steps + +Steps include: | Name | Description | |--------|-------------| | write | write a file in the staging directory | -| stage | stage a file across nodes | +| set | a step to define a global setting | +| copy | copy a file into staging (currently just local) | | submit | submit the job | | batch | submit the job with a batch command (more common in HPC) | | auth | authenticate with some service | +Note that for the above, we assume a shared filesystem unless stage directs that this isn't the case. These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed. +### Settings + +Any "set" directive can be used to set a more global setting on the transform. For example: + + - stage: defines the staging directory. If not set, will be a temporary directory that is created + - sharedfs: true or false to say that the filesystem is shared or not (defaults to false) + +For `sharedfs` it would be ideal to have a setting that is specific to the transformer, but unfortunately this could be true or false +for flux, so it has to be set. But this might be an interesting compatibility thing to test. + ### Example This example will assume receiving a Jobspec on a flux cluster. @@ -125,9 +141,6 @@ transform: filename: install.sh executable: true - - step: stage - filename: install.sh - - step: submit filename: install.sh wait: true @@ -136,21 +149,36 @@ transform: filename: job.sh executable: true - - step: stage - filename: job.sh - - step: submit filename: job.sh wait: true ``` -The above assumes we don't have a shared filesystem, and the receiving cluster has some cluster-specific method for staging or -file mapping. It could be ssh, or a filemap, or something else. For an ephemeral cluster API, it might be an interaction with -a storage provider, or just adding the file to an API call that will (in and of itself) do that creation, akin to a startup script for -an instance in Terraform. It really doesn't matter - the user can expect the file to be written and shared across nodes. -This is not intended to be a workflow or build tool - it simply is a transformational layer that a jobspec can provide -to setup a specific cluster environment. It works with a jobspec in that you define your filenames (scripts) in the tasks->scripts -directive. It also uses a plugin design, so a cluster or institution can write a custom transformer to install, and it will be discovered +The above assumes we have a shared filesystem, and by not setting the stage manually: + +```yaml +- step: set + key: stage + value: /tmp/path-for-workflow +``` + +We will use a custom one. If we didn't have a shared filesystem we would need to provide that detail. It's really akin +to a subsystem detail, because a job that assumes a shared fs won't be compatible. + +```yaml +- step: set + key: sharedfs + value: false +``` + +Whenever there is a copy (not shown) this assumes the receiving cluster has some cluster-specific method for copy or +file mapping, even in the case without a shared filesystem. It could be ssh, or a filemap, or something else. +For an ephemeral cluster API, it might be an interaction with a storage provider, or just adding the file to an API call that +will (in and of itself) do that creation, akin to a startup script for an instance in Terraform. It really doesn't matter - +the user can expect the file to be written and shared across nodes. This is not intended to be a workflow or build tool - +it simply is a transformational layer that a jobspec can provide to setup a specific cluster environment. It works with a +jobspec in that you define your filenames (scripts) in the tasks->scripts directive. It also uses a plugin design, so a +cluster or institution can write a custom transformer to install, and it will be discovered by name. This is intended to work with the prototype [rainbow](https://github.com/converged-computing/rainbow) scheduler. Jobspec is an entity of [flux-framework](https://flux-framework.org). diff --git a/examples/hello-world-jobspec.yaml b/examples/hello-world-jobspec.yaml index faa5f3f..b5db450 100644 --- a/examples/hello-world-jobspec.yaml +++ b/examples/hello-world-jobspec.yaml @@ -18,11 +18,6 @@ task: filename: job.sh executable: true - # This is only provided as an example - in the devcontainer it's just one physicap machine! - # and filemap I think will likely not work - # - step: stage - # filename: job.sh - - step: submit filename: job.sh diff --git a/examples/hello-world-wait-jobspec.yaml b/examples/hello-world-wait-jobspec.yaml index 0b04e39..85c7261 100644 --- a/examples/hello-world-wait-jobspec.yaml +++ b/examples/hello-world-wait-jobspec.yaml @@ -18,13 +18,9 @@ task: filename: job.sh executable: true - # This is only provided as an example - in the devcontainer it's just one physicap machine! - #- step: stage - # filename: install.sh - - step: submit filename: job.sh - wait: true + watch: true scripts: - name: job.sh diff --git a/jobspec/defaults.py b/jobspec/defaults.py new file mode 100644 index 0000000..6bfc87e --- /dev/null +++ b/jobspec/defaults.py @@ -0,0 +1 @@ +sharedfs = True diff --git a/jobspec/transform.py b/jobspec/transform.py index ae9ff5a..4cd26fa 100644 --- a/jobspec/transform.py +++ b/jobspec/transform.py @@ -1,8 +1,10 @@ +import copy import os import sys # This imports the latest version import jobspec.core as js +import jobspec.defaults as defaults import jobspec.utils as utils from jobspec.logger import LogColors @@ -35,11 +37,28 @@ def register_step(cls, step, name=None): name = name or step.name cls.steps[name] = step + def update_settings(self, settings, typ, step): + """ + Update settings, either set or unset + """ + if "key" not in step or "value" not in step: + return + if not step["key"]: + return + if typ == "set": + settings[step["key"]] = step["value"] + elif typ == "unset" and step["key"] in settings: + del settings[step["key"]] + def parse(self, jobspec): """ - parse validates transform logic and returns steps + parse validates transform logic and returns steps. + + We also look for global variables for steps. """ # We will return a listing of steps to complete + # Each step is provided all settings that are provided + # before it steps = [] # Each filename directive must have a matching script @@ -47,15 +66,27 @@ def parse(self, jobspec): # but not for the time being task = jobspec.jobspec.get("task") + # Global set settings + settings = {"sharedfs": defaults.sharedfs} + # Validate each step for i, step in enumerate(task.get("transform")): # The step must be known to the transformer name = step.get("step") if not name: raise ValueError(f"Step in index {i} is missing a name") + + # If it's a set or unset, add to settings + if name == "set" or name == "unset": + self.update_settings(settings, name, step) + continue + if name not in self.steps: raise ValueError(f"Step {name} is not known to transformer {self.name}") + # This ensures we get the exact state of settings at this level + step["settings"] = copy.deepcopy(settings) + # Instantiate the new step (does extra validation), provided entire jobspec new_step = self.steps[name](jobspec.jobspec, step) steps.append(new_step) @@ -76,6 +107,7 @@ def run(self, filename): # Run each step to submit the job, and that's it. for step in steps: + step_stage = stage or step["settings"].get("stage") self.run_step(step, stage) def run_step(self, step, stage): diff --git a/jobspec/transformer/flux.py b/jobspec/transformer/flux.py index da26f01..429c81d 100644 --- a/jobspec/transformer/flux.py +++ b/jobspec/transformer/flux.py @@ -44,7 +44,9 @@ def __init__(self, *args, **kwargs): class stage(StepBase): """ - A stage step uses flux filemap to stage across nodes + A copy step uses flux filemap to stage across nodes + + This assumes we don't have a shared filesystem. It is skipped if we do. """ name = "stage" @@ -53,6 +55,15 @@ def run(self, stage, *args, **kwargs): """ Run the stage step = fall back to filename for now """ + # If we have a sharedfs, return early, the write will have written it there + sharedfs = self.options.get("settings", {}).get("sharedfs") is True + if sharedfs: + return + + # Sanity check staging directory exists across nodes + cmd = ["mkdir", "-p", stage] + utils.run_command(cmd, check_output=True) + name = str(uuid.uuid4()) filename = self.options["filename"] cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename]