From eeaa039195e99eae706bd203e4ff5008dbe0fb95 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 28 Apr 2024 18:55:24 -0600 Subject: [PATCH] feat: support for flux workload Problem: a workload can be any number of submit/batch Solution: create a workload abstraction that can handle the nesting. We can write flux batch scripts as jobspecs to submit with flux job submit, and we do this by doing flux submit with --dry-run and then adding a faux command to a batch-script in files that does not exist. Then we load the jobspec, add the faux script (with the subtasks) and then flux job submit . Next I need to work on the group in group case (a flux batch that submits another flux batch) Signed-off-by: vsoch --- examples/task-with-group.yaml | 3 +- jobspec/core/core.py | 4 +- jobspec/core/resources.py | 2 +- jobspec/runner.py | 22 +- jobspec/schema.py | 1 + jobspec/steps/base.py | 11 +- jobspec/steps/runner.py | 22 ++ jobspec/transformer/flux/__init__.py | 2 +- jobspec/transformer/flux/steps.py | 257 ++++++++++++++++++++ jobspec/transformer/flux/transformer.py | 310 +----------------------- jobspec/transformer/flux/workload.py | 189 +++++++++++++++ jobspec/transformer/result.py | 2 + jobspec/utils.py | 17 ++ spec-1.md | 20 +- 14 files changed, 529 insertions(+), 333 deletions(-) create mode 100644 jobspec/steps/runner.py create mode 100644 jobspec/transformer/flux/steps.py create mode 100644 jobspec/transformer/flux/workload.py diff --git a/examples/task-with-group.yaml b/examples/task-with-group.yaml index 88251de..1693da4 100644 --- a/examples/task-with-group.yaml +++ b/examples/task-with-group.yaml @@ -11,7 +11,8 @@ groups: - name: task-2 depends_on: ["task-1"] resources: common - command: + tasks: + - command: - bash - -c - "echo Starting task 2; sleep 3; echo Finishing task 2" diff --git a/jobspec/core/core.py b/jobspec/core/core.py index cb411b8..de14f60 100644 --- a/jobspec/core/core.py +++ b/jobspec/core/core.py @@ -26,9 +26,9 @@ def validate(self): """ jsonschema.validate(self.data, self.schema) - # Require at least one of command or steps + # Require at least one of command or steps, unless it is a group for task in self.data.get("tasks", []): - if "command" not in task and "steps" not in task: + if "group" not in task and ("command" not in task and "steps" not in task): raise ValueError("Jobspec is not valid, each task must have a command or steps") diff --git a/jobspec/core/resources.py b/jobspec/core/resources.py index 978cb94..7e96c8d 100644 --- a/jobspec/core/resources.py +++ b/jobspec/core/resources.py @@ -39,7 +39,7 @@ def parse_resource_subset(named_resources, resources): raise ValueError("Asking for an AND in resources is not supported yet.") if resources not in named_resources: raise ValueError(f"Asked for resources '{resources}' that are not known") - return named_resources["resources"] + return named_resources[resources] # Case 2: It's just it's own thing return resources diff --git a/jobspec/runner.py b/jobspec/runner.py index 2717acd..71e1649 100644 --- a/jobspec/runner.py +++ b/jobspec/runner.py @@ -1,11 +1,9 @@ import os -import sys # This imports the latest version import jobspec.core as js import jobspec.defaults as defaults -import jobspec.logger as logger -from jobspec.logger import LogColors +import jobspec.steps.runner as step_runner class TransformerBase: @@ -79,23 +77,7 @@ def run(self, filename): # Run each step to submit the job, and that's it. for step in steps: - self.run_step(step) - - def run_step(self, step): - """ - Run a single step. Make it pretty. - """ - prefix = f"{self.name} {step.name}".ljust(15) - print(f"=> {LogColors.OKCYAN}{prefix}{LogColors.ENDC}", end="") - try: - result = step.run() - print( - f"{LogColors.OKBLUE}{result.out}{LogColors.ENDC} {LogColors.OKGREEN}OK{LogColors.ENDC}" - ) - result.print_extra() - except Exception as e: - print(f"\n{LogColors.RED}{str(e)}{LogColors.ENDC}") - sys.exit() + step_runner.run(self.name, step) def load_jobspec(self, filename): """ diff --git a/jobspec/schema.py b/jobspec/schema.py index ab887de..3079212 100644 --- a/jobspec/schema.py +++ b/jobspec/schema.py @@ -124,6 +124,7 @@ # Tasks for the group "tasks": {"$ref": "#definitions/tasks"}, }, + "additionalProperties": False, }, "intranode_resource_vertex": { "description": "schema for resource vertices within a node, cannot have child vertices", diff --git a/jobspec/steps/base.py b/jobspec/steps/base.py index b577860..eded26c 100644 --- a/jobspec/steps/base.py +++ b/jobspec/steps/base.py @@ -21,8 +21,9 @@ def __init__(self, js, **kwargs): # Shared validation self._validate() - # Custom validation + # Custom validation and setup self.validate() + self.setup(**kwargs) def _validate(self): """ @@ -41,6 +42,14 @@ def validate(self): """ pass + def setup(self, **kwargs): + """ + Custom setup + + Akin to overriding init, but easier to write. + """ + pass + def run(self, *args, **kwargs): """ Run a step. diff --git a/jobspec/steps/runner.py b/jobspec/steps/runner.py new file mode 100644 index 0000000..4776c74 --- /dev/null +++ b/jobspec/steps/runner.py @@ -0,0 +1,22 @@ +import sys + +from jobspec.logger import LogColors + + +def run(name, step): + """ + Run a single step. Make it pretty. + """ + prefix = f"{name} {step.name}".ljust(15) + print(f"=> {LogColors.OKCYAN}{prefix}{LogColors.ENDC}", end="") + try: + result = step.run() + if not result: + return + print( + f"{LogColors.OKBLUE}{result.out}{LogColors.ENDC} {LogColors.OKGREEN}OK{LogColors.ENDC}" + ) + result.print_extra() + except Exception as e: + print(f"\n{LogColors.RED}{str(e)}{LogColors.ENDC}") + sys.exit() diff --git a/jobspec/transformer/flux/__init__.py b/jobspec/transformer/flux/__init__.py index 6ff14d5..54b4683 100644 --- a/jobspec/transformer/flux/__init__.py +++ b/jobspec/transformer/flux/__init__.py @@ -1 +1 @@ -from .transformer import Transformer \ No newline at end of file +from .transformer import Transformer diff --git a/jobspec/transformer/flux/steps.py b/jobspec/transformer/flux/steps.py new file mode 100644 index 0000000..ed56371 --- /dev/null +++ b/jobspec/transformer/flux/steps.py @@ -0,0 +1,257 @@ +import json +import os +import uuid + +import jobspec.utils as utils +from jobspec.steps.base import StepBase +from jobspec.transformer.result import Result + +script_prefix = ["#!/bin/bash"] + +# Custom Flux steps - just write and register! + + +class stage(StepBase): + """ + A copy step uses flux filemap to stage across nodes + + This assumes we don't have a shared filesystem. It is skipped if we do. + """ + + name = "stage" + + def run(self, stage, *args, **kwargs): + """ + Run the stage step = fall back to filename for now + """ + # If we have a sharedfs, return early, the write will have written it there + sharedfs = self.options.get("settings", {}).get("sharedfs") is True + if sharedfs: + return + + # Sanity check staging directory exists across nodes + cmd = ["flux", "exec", "-r", "all", "-x", "0", "mkdir", "-p", stage] + utils.run_command(cmd, check_output=True) + + name = str(uuid.uuid4()) + filename = self.options["filename"] + cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename] + utils.run_command(cmd, check_output=True) + + # Assume we send to all ranks besides where we've already written it + # This will likely fail if the filesystem is shared + cmd = [ + "flux", + "exec", + "--dir", + stage, + "-r", + "all", + "-x", + "0", + "flux", + "filemap", + "get", + "--tags", + name, + ] + utils.run_command(cmd, check_output=False) + + # Unmap to clear the memory map + cmd = ["flux", "filemap", "unmap", "--tags", name] + utils.run_command(cmd, check_output=True) + + +class JobBase(StepBase): + """ + base with shared logic for submit or batch + """ + + def cleanup(self, filename): + """ + Cleanup a filename if it still exists + """ + if os.path.exists(filename): + os.remove(filename) + + def prepare(self, command=None, waitable=False): + """ + Return the command, without flux submit|batch + """ + cmd = [] + + # We can get the resources from options + resources = self.options.get("resources") + + # These aren't used yet - they need to go into flux + attributes = self.options.get("attributes") or {} + task = self.options.get("task") or {} + + # This flattens to be what we ask flux for + slot = resources.flatten_slot() + nodes = slot.get("node") + tasks = slot.get("core") + + # Get name, jobspec, depends, etc + name = self.options.get("name") + duration = attributes.get("duration") + cwd = attributes.get("cwd") + watch = attributes.get("watch") + + # We can't support this yet because it needs the jobid + # That design to require to get it seems fragile + # for depends_on in task.get("depends_on") or []: + # cmd += [f"--dependency={depends_on}"] + + if cwd is not None: + cmd += ["--cwd", cwd] + if name is not None: + cmd += ["--job-name", name] + if duration is not None: + cmd += ["--time-limit", str(duration)] + if watch is True: + cmd += ["--watch"] + + if nodes: + cmd += ["-N", str(nodes)] + if tasks: + cmd += ["-n", str(tasks)] + + # Replicas we do with cc + replicas = task.get("replicas") + if replicas: + replicas -= 1 + cmd += ["--cc", f"0-{replicas}"] + + # Are we adding a waitable flag? This will prevent the batch + # from exiting + if waitable: + cmd += ["--flags=waitable"] + + # Right now assume command is required + if not command: + command = task["command"] + if isinstance(command, str): + command = [command] + cmd += command + return cmd + + +class batch(JobBase): + """ + flux batch is the flux implementation of a group. + + It starts an allocation to run commands and launch + other tasks or groups. + """ + + name = "batch" + + def setup(self, **kwargs): + """ + Custom setup + + A batch can hold one or more task steps, and these need to be written into + a custom script (and not run as tasks) + """ + self.tasks = [] + + def write_tasks_script(self): + """ + Generate the batch script. + """ + data = script_prefix + for task in self.tasks: + if task.name == "batch": + print("TODO deal with nested batch") + import IPython + + IPython.embed() + data.append(" ".join(task.generate_command(waitable=True))) + + # Ensure all jobs are waited on + data.append("flux job wait --all") + return {"mode": 33216, "data": "\n".join(data), "encoding": "utf-8"} + + def run(self, *args, **kwargs): + """ + Run the batch step + """ + # With batch, we are going to cheat and run a flux job submit --dry-run + # with the command that would normally be derived for flux batch. + command = ["flux", "broker", "{{tmpdir}}/batch-script"] + cmd = self.prepare(command) + + # This returns a partial jobspec for the work that is needed... + cmd = ["flux", "submit", "--dry-run"] + cmd + res = utils.run_command(cmd, check_output=True) + + if res["return_code"] != 0: + raise ValueError(f"Issue generating flux jobspec: {res['message']}") + + # Then we will add our (not written) batch script to the files section + # Load the jobspec and add our "files" section to it with tasks (flux submit, etc) + js = json.loads(res["message"]) + + # Be careful about updating files - not sure if there are cases + # when there might be other content there. + files = js["attributes"]["system"].get("files") or {} + files["batch-script"] = self.write_tasks_script() + js["attributes"]["system"]["files"] = files + + # Prepare a result, we can show the batch script if debug is on + result = Result() + for line in files["batch-script"]["data"].split("\n"): + result.add_debug_line(line) + + # Write the jobspec to a temporary file, target for cleanup + tmpfile = utils.get_tmpfile(prefix="jobspec-") + utils.write_file(json.dumps(js), tmpfile) + + # and submit with flux job submit + # We can't really watch here, or do anything with attributes yet + cmd = ["flux", "job", "submit", tmpfile] + res = utils.run_command(cmd, check_output=True) + + # Prepare a result to return + result.out = res["message"].strip() + result.add_debug_line(" ".join(cmd)) + + # Cleanup the files + self.cleanup(tmpfile) + return result + + +class submit(JobBase): + name = "submit" + + def generate_command(self, waitable=False): + """ + Convenience function to generate the command. + + This is intended for flux batch to use + """ + cmd = self.prepare(waitable=waitable) + return ["flux", "submit"] + cmd + + def run(self, *args, **kwargs): + """ + Run the submit step. + + The python bindings are giving me weird errors. + """ + cmd = self.generate_command() + + # Are we watching? + attributes = self.options.get("attributes") or {} + watch = attributes.get("watch") + res = utils.run_command(cmd, check_output=True, stream=watch) + + # Prepare a result to return + result = Result() + + # Return results to print + if not watch: + result.out = res["message"].strip() + result.add_debug_line(" ".join(cmd)) + return result diff --git a/jobspec/transformer/flux/transformer.py b/jobspec/transformer/flux/transformer.py index e0c19f1..3bb7feb 100644 --- a/jobspec/transformer/flux/transformer.py +++ b/jobspec/transformer/flux/transformer.py @@ -1,11 +1,11 @@ import uuid import jobspec.core as js -import jobspec.utils as utils -from jobspec.runner import TransformerBase -from jobspec.steps.base import StepBase -from jobspec.transformer.result import Result import jobspec.core.resources as rcore +from jobspec.runner import TransformerBase + +from .steps import batch, stage, submit +from .workload import FluxWorkload # handle for steps to use handle = None @@ -32,302 +32,18 @@ def parse(self, jobspec): """ Parse the jobspec into tasks for flux. """ - # Return series of steps - steps = [] - - # Save lookup of named group (batch) steps - # Some of these will be nested, and some at the top level to submit - groups = {} - - # Start with global named resources, attributes, and requires - resources = jobspec.get("resources", {}) - requires = jobspec.get("requires", {}) - tasks = jobspec.get("tasks", []) - groups = jobspec.get("groups", []) - - # We will return a listing of steps to complete - # Parse a-la-carte tasks for the top level - flux submit each - if tasks: - steps += self.parse_tasks(jobspec, tasks, resources, requires=requires) - - # Now add on groups. Each group can have a-la-carte tasks, - # and commands to run in the group (script) - if groups: - steps += self.parse_groups(jobspec, groups, resources, requires=requires) - return steps - - def parse_groups(self, jobspec, groups, resources=None, requires=None): - """ - Parse groups and return a list of steps. If tasks are within a group, - they are written to the script of the group. - - This might be tricky to figure out for the very nested version. - """ - # We first need to find - - print("PARSE GROUPS") - import IPython - IPython.embed() - sys.exit() - resources = resources or {} - attributes = attributes or {} - - steps = [] - for i, task in enumerate(tasks): - task_resources = task.get("resources", {}) - - # Create a name based on the index or the task name - name = task.get("name") or f"task-{i}" - - # The slot is optional and drives where the match is targeting - slot = task.get("slot") - - # If the task has resources, must be: - # A named section in the global resources - # A subset of parent resources - task_resources = js.Resources( - rcore.parse_resource_subset(resources, task_resources), slot=slot - ) - - # Derive and update task attributes, if provided - task_attributes = js.Attributes(attributes) - task_attributes.update(task.get("attributes")) - - # Same for requires. This might eventually include retrieval of - # artifact metadata first. - task_requires = js.Requires(attributes) - task_requires.update(task.get("requires")) - - new_step = self.steps["submit"]( - jobspec, - name=name, - resources=task_resources, - attributes=task_attributes, - requires=task_requires, - task=task, - ) - steps.append(new_step) - - return steps - - - def parse_tasks(self, jobspec, tasks, resources=None, attributes=None, requires=None): - """ - Parse a jobspec (or group) tasks and return a list of steps. - - If the task is defined in a group, the attributes provided will be inherited - from the group. Otherwise they will likely be empty. - """ - resources = resources or {} - attributes = attributes or {} - requires = requires or {} - - steps = [] - for i, task in enumerate(tasks): - task_resources = task.get("resources", {}) - - # Create a name based on the index or the task name - name = task.get("name") or f"task-{i}" - - # The slot is optional and drives where the match is targeting - slot = task.get("slot") - - # If the task has resources, must be: - # A named section in the global resources - # A subset of parent resources - task_resources = js.Resources( - rcore.parse_resource_subset(resources, task_resources), slot=slot - ) - - # Derive and update task attributes, if provided - task_attributes = js.Attributes(attributes) - task_attributes.update(task.get("attributes")) - - # Same for requires. This might eventually include retrieval of - # artifact metadata first. - task_requires = js.Requires(requires) - task_requires.update(task.get("requires")) - - new_step = self.steps["submit"]( - jobspec, - name=name, - resources=task_resources, - attributes=task_attributes, - requires=task_requires, - task=task, - ) - steps.append(new_step) - - return steps - - -# Custom Flux steps - just write and register! - - -class stage(StepBase): - """ - A copy step uses flux filemap to stage across nodes - - This assumes we don't have a shared filesystem. It is skipped if we do. - """ - - name = "stage" - - def run(self, stage, *args, **kwargs): - """ - Run the stage step = fall back to filename for now - """ - # If we have a sharedfs, return early, the write will have written it there - sharedfs = self.options.get("settings", {}).get("sharedfs") is True - if sharedfs: - return - - # Sanity check staging directory exists across nodes - cmd = ["flux", "exec", "-r", "all", "-x", "0", "mkdir", "-p", stage] - utils.run_command(cmd, check_output=True) - - name = str(uuid.uuid4()) - filename = self.options["filename"] - cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename] - utils.run_command(cmd, check_output=True) - - # Assume we send to all ranks besides where we've already written it - # This will likely fail if the filesystem is shared - cmd = [ - "flux", - "exec", - "--dir", - stage, - "-r", - "all", - "-x", - "0", - "flux", - "filemap", - "get", - "--tags", - name, - ] - utils.run_command(cmd, check_output=False) - - # Unmap to clear the memory map - cmd = ["flux", "filemap", "unmap", "--tags", name] - utils.run_command(cmd, check_output=True) - - -class JobBase(StepBase): - """ - base with shared logic for submit or batch - """ - - def prepare(self): - """ - Return the command, without flux submit|batch - """ - cmd = [] - - # We can get the resources from options - resources = self.options.get("resources") - - # These aren't used yet - they need to go into flux - attributes = self.options.get("attributes") or {} - task = self.options.get("task") or {} - - # This flattens to be what we ask flux for - slot = resources.flatten_slot() - nodes = slot.get("node") - tasks = slot.get("core") - - # Get name, jobspec, depends, etc - name = self.options.get("name") - duration = attributes.get("duration") - cwd = attributes.get("cwd") - watch = attributes.get("watch") - - # We can't support this yet because it needs the jobid - # That design to require to get it seems fragile - # for depends_on in task.get("depends_on") or []: - # cmd += [f"--dependency={depends_on}"] - - if cwd is not None: - cmd += ["--cwd", cwd] - if name is not None: - cmd += ["--job-name", name] - if duration is not None: - cmd += ["--time-limit", str(duration)] - if watch is True: - cmd += ["--watch"] - - if nodes: - cmd += ["-N", str(nodes)] - if tasks: - cmd += ["-n", str(tasks)] - - # Right now assume command is required - command = task["command"] - if isinstance(command, str): - command = [command] - cmd += command - return cmd - - -class batch(JobBase): - name = "batch" - - def run(self, *args, **kwargs): - """ - Run the batch step - """ - cmd = self.prepare() - cmd = ["flux", "batch"] + cmd - - # Are we watching? - attributes = self.options.get("attributes") or {} - watch = attributes.get("watch") - res = utils.run_command(cmd, check_output=True, stream=watch) - - # Prepare a result to return - result = Result() - - # Are we watching? - attributes = self.options.get("attributes") or {} - watch = attributes.get("watch") - - # Return results to print - if not watch: - result.out = res["message"].strip() - result.add_debug_line(" ".join(cmd)) - return result - - -class submit(JobBase): - name = "submit" - - def run(self, *args, **kwargs): - """ - Run the submit step. - - The python bindings are giving me weird errors. - """ - cmd = self.prepare() - cmd = ["flux", "submit"] + cmd - - # Are we watching? - attributes = self.options.get("attributes") or {} - watch = attributes.get("watch") - res = utils.run_command(cmd, check_output=True, stream=watch) - - # Prepare a result to return - result = Result() + # The transformer needs to share state between tasks and groups, so we create + # a shared class to do that. A "flux workload" is some number of tasks / batches + # It replaces the step abstractions to be run. Anoher workload manager that does + # not have the tight dependencies would not need to do this. + workload = FluxWorkload(jobspec) + workload.parse() - # Return results to print - if not watch: - result.out = res["message"].strip() - result.add_debug_line(" ".join(cmd)) - return result + # Return the workload to run + return [workload] # A transformer can register shared steps, or custom steps Transformer.register_step(batch) Transformer.register_step(submit) -Transformer.register_step(stage) \ No newline at end of file +Transformer.register_step(stage) diff --git a/jobspec/transformer/flux/workload.py b/jobspec/transformer/flux/workload.py new file mode 100644 index 0000000..f0f5aeb --- /dev/null +++ b/jobspec/transformer/flux/workload.py @@ -0,0 +1,189 @@ +import jobspec.core as js +import jobspec.core.resources as rcore +import jobspec.steps.runner as step_runner +from jobspec.transformer.result import Result + +from .steps import batch, submit + + +class FluxWorkload: + """ + The flux workload (batch and submit) holder + """ + + name = "workload" + + def __init__(self, jobspec): + """ + Add the jobspec and set globals (resources, tasks, requires) + """ + self.js = jobspec + self.group_lookup = {} + + # Top level, a-la-carte tasks (steps) + # TODO - we might need to decide on order for top level tasks vs. groups + self.tasks = [] + + @property + def resources(self): + """ + This returns a global resource lookup + """ + return self.js.get("resources", {}) + + @property + def requires(self): + return self.js.get("requires", {}) + + def run(self, *args, **kwargs): + """ + Run the steps of the workload, some number of submit/batch. + """ + print() + # This will be a combination of submit and batch + for step in self.tasks: + step_runner.run("flux", step) + + def parse(self): + """ + Parse the jobspec into tasks for flux. + """ + # Parse top level groups into a lookup. Those that don't have a name + # are given a name based on order, and assumed not to be linked to anything + for i, group in enumerate(self.js.get("groups") or []): + name = group.get("name") or f"batch-{i}" + self.group_lookup[name] = group + + # We will return a listing of steps to complete - flux submit each + # A group referenced within a task group is parsed there + tasks = self.js.get("tasks") or [] + if tasks: + self.tasks = self.parse_tasks(tasks, self.resources, requires=self.requires) + + # Now parse remaining groups "a la carte" that aren't linked to top level tasks + for name, group in self.group_lookup.items(): + self.tasks.prepend( + self.parse_group(group, name, self.resources, requires=self.requires) + ) + + def parse_group(self, group, name, resources=None, requires=None, attributes=None): + """ + Parse a group and return a step. If tasks are within a group, + they are written to the script of the group. + + This might be tricky to figure out for the very nested version. + """ + resources = resources or {} + requires = requires or {} + attributes = attributes or {} + + group_resources = group.get("resources", {}) + group_attributes = group.get("attributes", {}) + + # Group requires gets updated from globa (and passed to task) + group_requires = js.Requires(requires) + group_requires.update(group.get("requires")) + + # Group resources don't have a slot + group_resources = js.Resources(rcore.parse_resource_subset(resources, group_resources)) + + # Parse the task steps for the group + tasks = group.get("tasks") or [] + steps = [] + if tasks: + name_prefix = f"{name}-" + steps = self.parse_tasks( + tasks, + resources=group_resources, + requires=group_requires.data, + attributes=group_attributes, + name_prefix=name_prefix, + ) + + # Prepare a batch (group) step + new_step = batch( + self.js, + name=name, + resources=group_resources, + attributes=group_attributes, + requires=group_requires, + tasks=tasks, + ) + new_step.tasks = steps + return new_step + + def parse_task(self, task, name, resources=None, requires=None, attributes=None): + """ + Parse a task and return a step. + """ + task_resources = task.get("resources", {}) + + # The slot is optional and drives where the match is targeting + slot = task.get("slot") + + # If the task has resources, must be: + # A named section in the global resources + # A subset of parent resources + task_resources = js.Resources( + rcore.parse_resource_subset(resources, task_resources), slot=slot + ) + + # Derive and update task attributes, if provided + task_attributes = js.Attributes(attributes).update(task.get("attributes")) + + # Same for requires. This might eventually include retrieval of + # artifact metadata first. + task_requires = js.Requires(requires).update(task.get("requires")) + + # Prepare a submit step + return submit( + self.js, + name=name, + resources=task_resources, + attributes=task_attributes, + requires=task_requires, + task=task, + ) + + def parse_tasks(self, tasks, resources=None, attributes=None, requires=None, name_prefix=None): + """ + Parse a jobspec (or group) tasks and return a list of steps. + + If the task is defined in a group, the attributes provided will be inherited + from the group. Otherwise they will likely be empty. + """ + # A name prefix helps with defaults to scope to a group + name_prefix = name_prefix or "" + resources = resources or {} + attributes = attributes or {} + requires = requires or {} + + steps = [] + for i, task in enumerate(tasks): + # Create a name based on the index or the task name + name = task.get("name") or f"task-{i}" + + # Case 1: We found a group! Parse here and add to steps + group_name = task.get("group") + if group_name is not None: + group = self.group_lookup.get(group_name) + if not group: + raise ValueError( + f"Task {name} is looking for group {group_name} that is not defined." + ) + + # We assume a group is used once + del self.group_lookup[group_name] + new_step = self.parse_group( + group, group_name, resources, requires=requires, attributes=attributes + ) + + # Case 2: we have a regular task to flux submit + else: + new_step = self.parse_task( + task, name, resources, requires=requires, attributes=attributes + ) + + steps.append(new_step) + + return steps diff --git a/jobspec/transformer/result.py b/jobspec/transformer/result.py index 8c4efae..54528f3 100644 --- a/jobspec/transformer/result.py +++ b/jobspec/transformer/result.py @@ -1,9 +1,11 @@ from jobspec.logger import logger + class Result: """ Helper class to encompass a result """ + def __init__(self, out=None, prefix=" ", has_depends_on=False): self.out = out or "" self.lines = [] diff --git a/jobspec/utils.py b/jobspec/utils.py index f8787be..88fc191 100644 --- a/jobspec/utils.py +++ b/jobspec/utils.py @@ -23,6 +23,23 @@ def read_file(filename): return content +def get_tmpfile(tmpdir=None, prefix=""): + """ + Get a temporary file with an optional prefix. + """ + # First priority for the base goes to the user requested. + tmpdir = get_tmpdir(tmpdir) + + # If tmpdir is set, add to prefix + if tmpdir: + prefix = os.path.join(tmpdir, os.path.basename(prefix)) + + fd, tmp_file = tempfile.mkstemp(prefix=prefix) + os.close(fd) + + return tmp_file + + def get_tmpdir(tmpdir=None, prefix="", create=True): """ Get a temporary directory for an operation. diff --git a/spec-1.md b/spec-1.md index 0612893..31239d2 100644 --- a/spec-1.md +++ b/spec-1.md @@ -3,7 +3,7 @@ ## Example Let's start with a simple example. This is a **tasks a la carte** pattern, which means submitting isolated tasks, but they depend on one another -(which is not required for the pattern). We aren't doing anything fancy with flux hierarchies. +(which is not required for the pattern). We aren't doing anything fancy with flux hierarchies. For this set of tasks, we want to write a JobSpec to build and run the package "ior" with spack. That might look like this: ```yaml @@ -89,7 +89,7 @@ tasks: This is a more condensed, and easier to read version. We aren't reading _exactly_ from top to bottom because we have to jump back up to see the "spack-resources" reference, but it's more succinct in total, making it appealing still. The above assumes a cluster with a shared filesystem, where a spack install is already on the user's default path. -Now let's walk through specific sections of the above, and then we will move into advanced patterns. +Now let's walk through specific sections of the above, and then we will move into advanced patterns. ## Tasks @@ -120,7 +120,7 @@ This above assumes a shared filesystem. ## Groups -Different workload managers can represent the concept of a logical grouping of tasks. While they might vary in the nesting of the groups (for example, Flux can nest up the smallest granularity or unit of resource possible) most have the idea of a top level batch script running smaller commands. So let's start with that. +Different workload managers can represent the concept of a logical grouping of tasks. While they might vary in the nesting of the groups (for example, Flux can nest up the smallest granularity or unit of resource possible) most have the idea of a top level batch script running smaller commands. So let's start with that. - A **group** is a logical set of tasks that are run under shared resources. @@ -181,7 +181,7 @@ groups: - local: true command: ["kubectl", "apply", "-f", "./task-queue.yaml"] - # A reference to the group "train" defined below, + # A reference to the group "train" defined below, # This will be a flux batch in the top level flux batch - group: train @@ -206,7 +206,7 @@ groups: - -c - | ml-train ... -o train.json - compspec save-artifact ./train.json --host http://localhost:8080 + compspec save-artifact ./train.json --host http://localhost:8080 # And stop the service - local: true @@ -586,7 +586,7 @@ resources: groups: - name: machine-learning resources: - + tasks: # Local means run at the instance level, no flux submit @@ -597,9 +597,9 @@ groups: - name: sleep resources: single-node command: ["sleep", "infinity"] - + # flux batch to launch the generate-data level - # TODO we need logic here to say "wait until this finishes" + # TODO we need logic here to say "wait until this finishes" # same for submits - group: generate-data @@ -607,7 +607,7 @@ groups: - local: true command: kubectl delete -f ./database.yaml - # this is another flux batch, launched before the end, + # this is another flux batch, launched before the end, - name: generate-data resources: generate-data tasks: @@ -630,4 +630,4 @@ Additional properties and attributes we might want to consider - ability to add `--watch` or generally stream logs. - easy way to write scripts / config files? Just via a task? - how to represent an OR for resources (not thought about this yet) -- depends_on cannot be supported yet because of [this](https://github.com/flux-framework/flux-core/issues/5917) and I don't like the design of needing to carry around a lookup of job submit IDs to task names. \ No newline at end of file +- depends_on cannot be supported yet because of [this](https://github.com/flux-framework/flux-core/issues/5917) and I don't like the design of needing to carry around a lookup of job submit IDs to task names.