diff --git a/README.md b/README.md index a151861..d9fca57 100644 --- a/README.md +++ b/README.md @@ -13,18 +13,34 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say ## Usage A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular -execution environment. They include: +execution environment. + +### Steps + +Steps include: | Name | Description | |--------|-------------| | write | write a file in the staging directory | -| stage | stage a file across nodes | +| set | a step to define a global setting | +| copy | copy a file into staging (currently just local) | | submit | submit the job | | batch | submit the job with a batch command (more common in HPC) | | auth | authenticate with some service | +Note that for the above, we assume a shared filesystem unless stage directs that this isn't the case. These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed. +### Settings + +Any "set" directive can be used to set a more global setting on the transform. For example: + + - stage: defines the staging directory. If not set, will be a temporary directory that is created + - sharedfs: true or false to say that the filesystem is shared or not (defaults to false) + +For `sharedfs` it would be ideal to have a setting that is specific to the transformer, but unfortunately this could be true or false +for flux, so it has to be set. But this might be an interesting compatibility thing to test. + ### Example This example will assume receiving a Jobspec on a flux cluster. @@ -125,9 +141,6 @@ transform: filename: install.sh executable: true - - step: stage - filename: install.sh - - step: submit filename: install.sh wait: true @@ -136,21 +149,36 @@ transform: filename: job.sh executable: true - - step: stage - filename: job.sh - - step: submit filename: job.sh wait: true ``` -The above assumes we don't have a shared filesystem, and the receiving cluster has some cluster-specific method for staging or -file mapping. It could be ssh, or a filemap, or something else. For an ephemeral cluster API, it might be an interaction with -a storage provider, or just adding the file to an API call that will (in and of itself) do that creation, akin to a startup script for -an instance in Terraform. It really doesn't matter - the user can expect the file to be written and shared across nodes. -This is not intended to be a workflow or build tool - it simply is a transformational layer that a jobspec can provide -to setup a specific cluster environment. It works with a jobspec in that you define your filenames (scripts) in the tasks->scripts -directive. It also uses a plugin design, so a cluster or institution can write a custom transformer to install, and it will be discovered +The above assumes we have a shared filesystem, and by not setting the stage manually: + +```yaml +- step: set + key: stage + value: /tmp/path-for-workflow +``` + +We will use a custom one. If we didn't have a shared filesystem we would need to provide that detail. It's really akin +to a subsystem detail, because a job that assumes a shared fs won't be compatible. + +```yaml +- step: set + key: sharedfs + value: false +``` + +Whenever there is a copy (not shown) this assumes the receiving cluster has some cluster-specific method for copy or +file mapping, even in the case without a shared filesystem. It could be ssh, or a filemap, or something else. +For an ephemeral cluster API, it might be an interaction with a storage provider, or just adding the file to an API call that +will (in and of itself) do that creation, akin to a startup script for an instance in Terraform. It really doesn't matter - +the user can expect the file to be written and shared across nodes. This is not intended to be a workflow or build tool - +it simply is a transformational layer that a jobspec can provide to setup a specific cluster environment. It works with a +jobspec in that you define your filenames (scripts) in the tasks->scripts directive. It also uses a plugin design, so a +cluster or institution can write a custom transformer to install, and it will be discovered by name. This is intended to work with the prototype [rainbow](https://github.com/converged-computing/rainbow) scheduler. Jobspec is an entity of [flux-framework](https://flux-framework.org). diff --git a/examples/hello-world-jobspec.yaml b/examples/hello-world-jobspec.yaml index faa5f3f..b5db450 100644 --- a/examples/hello-world-jobspec.yaml +++ b/examples/hello-world-jobspec.yaml @@ -18,11 +18,6 @@ task: filename: job.sh executable: true - # This is only provided as an example - in the devcontainer it's just one physicap machine! - # and filemap I think will likely not work - # - step: stage - # filename: job.sh - - step: submit filename: job.sh diff --git a/examples/hello-world-wait-jobspec.yaml b/examples/hello-world-wait-jobspec.yaml index 0b04e39..85c7261 100644 --- a/examples/hello-world-wait-jobspec.yaml +++ b/examples/hello-world-wait-jobspec.yaml @@ -18,13 +18,9 @@ task: filename: job.sh executable: true - # This is only provided as an example - in the devcontainer it's just one physicap machine! - #- step: stage - # filename: install.sh - - step: submit filename: job.sh - wait: true + watch: true scripts: - name: job.sh diff --git a/jobspec/defaults.py b/jobspec/defaults.py new file mode 100644 index 0000000..b3bfb8a --- /dev/null +++ b/jobspec/defaults.py @@ -0,0 +1,2 @@ +valid_settings = {"sharedfs", "stage"} +sharedfs = True diff --git a/jobspec/transform.py b/jobspec/runner.py similarity index 70% rename from jobspec/transform.py rename to jobspec/runner.py index ae9ff5a..fe1997d 100644 --- a/jobspec/transform.py +++ b/jobspec/runner.py @@ -1,8 +1,10 @@ +import copy import os import sys # This imports the latest version import jobspec.core as js +import jobspec.defaults as defaults import jobspec.utils as utils from jobspec.logger import LogColors @@ -35,11 +37,31 @@ def register_step(cls, step, name=None): name = name or step.name cls.steps[name] = step + def update_settings(self, settings, typ, step): + """ + Update settings, either set or unset + """ + if "key" not in step or "value" not in step: + return + if not step["key"]: + return + # This is important for typos, etc. + if step["key"] not in defaults.valid_settings: + raise ValueError(f"{step['key']} is not a known setting.") + if typ == "set": + settings[step["key"]] = step["value"] + elif typ == "unset" and step["key"] in settings: + del settings[step["key"]] + def parse(self, jobspec): """ - parse validates transform logic and returns steps + parse validates transform logic and returns steps. + + We also look for global variables for steps. """ # We will return a listing of steps to complete + # Each step is provided all settings that are provided + # before it steps = [] # Each filename directive must have a matching script @@ -47,15 +69,27 @@ def parse(self, jobspec): # but not for the time being task = jobspec.jobspec.get("task") + # Global set settings + settings = {"sharedfs": defaults.sharedfs} + # Validate each step for i, step in enumerate(task.get("transform")): # The step must be known to the transformer name = step.get("step") if not name: raise ValueError(f"Step in index {i} is missing a name") + + # If it's a set or unset, add to settings + if name == "set" or name == "unset": + self.update_settings(settings, name, step) + continue + if name not in self.steps: raise ValueError(f"Step {name} is not known to transformer {self.name}") + # This ensures we get the exact state of settings at this level + step["settings"] = copy.deepcopy(settings) + # Instantiate the new step (does extra validation), provided entire jobspec new_step = self.steps[name](jobspec.jobspec, step) steps.append(new_step) @@ -76,7 +110,8 @@ def run(self, filename): # Run each step to submit the job, and that's it. for step in steps: - self.run_step(step, stage) + step_stage = stage or step["settings"].get("stage") + self.run_step(step, step_stage) def run_step(self, step, stage): """ diff --git a/jobspec/transformer/flux.py b/jobspec/transformer/flux.py index da26f01..00648d0 100644 --- a/jobspec/transformer/flux.py +++ b/jobspec/transformer/flux.py @@ -12,8 +12,8 @@ import jobspec.steps as steps import jobspec.utils as utils +from jobspec.runner import TransformerBase from jobspec.steps.base import StepBase -from jobspec.transform import TransformerBase logger = logging.getLogger("jobspec-flux") @@ -44,7 +44,9 @@ def __init__(self, *args, **kwargs): class stage(StepBase): """ - A stage step uses flux filemap to stage across nodes + A copy step uses flux filemap to stage across nodes + + This assumes we don't have a shared filesystem. It is skipped if we do. """ name = "stage" @@ -53,6 +55,15 @@ def run(self, stage, *args, **kwargs): """ Run the stage step = fall back to filename for now """ + # If we have a sharedfs, return early, the write will have written it there + sharedfs = self.options.get("settings", {}).get("sharedfs") is True + if sharedfs: + return + + # Sanity check staging directory exists across nodes + cmd = ["flux", "exec", "-r", "all", "-x", "0", "mkdir", "-p", stage] + utils.run_command(cmd, check_output=True) + name = str(uuid.uuid4()) filename = self.options["filename"] cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename] @@ -99,51 +110,18 @@ def run(self, stage, *args, **kwargs): # I don't think batch has python bindings? filename = self.options.get("filename") - cmd = ["flux", "batch"] + cmd = ["flux", "batch", "--cwd", stage] if nodes: cmd += ["-N", str(nodes)] if tasks: cmd += ["-n", str(tasks)] cmd.append(filename) - - # Would be nice if this was exposed as "from jobspec" - # https://github.com/flux-framework/flux-core/blob/master/src/bindings/python/flux/cli/batch.py#L109-L120 with utils.workdir(stage): res = utils.run_command(cmd, check_output=True) - - # 👀️ 👀️ 👀️ jobid = res["message"].strip() - wait = self.options.get("wait") is True - if wait: - watch_job(handle, jobid) return jobid -def watch_job(handle, jobid): - """ - Shared function to watch a job - """ - import flux.job - - if isinstance(jobid, str): - jobid = flux.job.JobID(jobid) - - print() - watcher = flux.job.watcher.JobWatcher( - handle, - progress=False, - jps=False, # show throughput with progress - log_events=False, - log_status=True, - labelio=False, - wait=True, - watch=True, - ) - watcher.start() - watcher.add_jobid(jobid) - handle.reactor_run() - - class submit(StepBase): name = "submit" @@ -157,42 +135,33 @@ def validate(self): def run(self, stage, *args, **kwargs): """ - Run the submit step - """ - import flux.job + Run the submit step. - # Parse jobspec into yaml stream, because it doesn't have support for json stream - # Also remove "experimental" feature lol - js = copy.deepcopy(self.jobspec) - for key in ["scripts", "transform", "resources"]: - if key in js.get("task"): - del js["task"][key] + The python bindings are giving me weird errors. + """ + slot = self.flatten_slot() + nodes = slot.get("node") + tasks = slot.get("core") - # Use the filename or fall back to command + # I don't think batch has python bindings? filename = self.options.get("filename") - - # Task -> tasks - if "task" in js: - task = js.get("task") - del js["task"] - js["tasks"] = [task] - if "command" not in task: - task["command"] = ["/bin/bash", filename] - - # It requires attributes, even if it's empty... - if "attributes" not in js: - js["attributes"] = {"system": {"duration": 3600, "cwd": stage}} - - # Are we watching or waiting (note that watching implies waiting? + cmd = ["flux", "submit", "--cwd", stage] watch = self.options.get("watch") is True - wait = self.options.get("wait") is True or watch is True - flux_jobspec = flux.job.JobspecV1.from_yaml_stream(yaml.dump(js)) - jobid = flux.job.submit(handle, flux_jobspec, waitable=wait) - - # 👀️ 👀️ 👀️ if watch: - watch_job(handle, jobid) - return jobid.f58plain + cmd.append("--watch") + if nodes: + cmd += ["-N", str(nodes)] + if tasks: + cmd += ["-n", str(tasks)] + cmd += ["/bin/bash", filename] + print("\n" + " ".join(cmd)) + + with utils.workdir(stage): + res = utils.run_command(cmd, check_output=True, stream=watch) + + if not watch: + jobid = res["message"].strip() + return jobid # A transformer can register shared steps, or custom steps diff --git a/jobspec/utils.py b/jobspec/utils.py index f975b63..f8787be 100644 --- a/jobspec/utils.py +++ b/jobspec/utils.py @@ -95,5 +95,8 @@ def run_command(cmd, stream=False, check_output=False, return_code=0): # Check the output and raise an error if not success if check_output and t[1] != return_code: - raise ValueError(output["message"].strip()) + if output["message"]: + raise ValueError(output["message"].strip()) + else: + raise ValueError(f"Failed execution, return code {t[1]}") return output