From 3ff140f23c2274b119ae5f07611113541351f080 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 28 Apr 2024 19:41:01 -0600 Subject: [PATCH] wip: save state Signed-off-by: vsoch --- jobspec/core/core.py | 1 + jobspec/core/resources.py | 7 ++- jobspec/transformer/flux/__init__.py | 2 +- jobspec/transformer/flux/steps.py | 39 ++++++++---- jobspec/transformer/flux/transformer.py | 49 --------------- jobspec/transformer/flux/workload.py | 80 +++++++++++++------------ jobspec/utils.py | 2 +- 7 files changed, 80 insertions(+), 100 deletions(-) delete mode 100644 jobspec/transformer/flux/transformer.py diff --git a/jobspec/core/core.py b/jobspec/core/core.py index de14f60..850178c 100644 --- a/jobspec/core/core.py +++ b/jobspec/core/core.py @@ -97,3 +97,4 @@ def update(self, requires): # If we have the group, update on the level of fields self.data[group].update(fields) + return self diff --git a/jobspec/core/resources.py b/jobspec/core/resources.py index 7e96c8d..d83ac8f 100644 --- a/jobspec/core/resources.py +++ b/jobspec/core/resources.py @@ -17,7 +17,6 @@ def find_resources(flat, resource, slot, last_one=False): # More traversing... if "with" in resource: for r in resource["with"]: - print(r) find_resources(flat, r, slot, last_one) return flat @@ -31,6 +30,12 @@ def parse_resource_subset(named_resources, resources): the batch has won't be satisfied. But if this is a grow/autoscale setup, maybe it eventually could be, so we allow it. """ + # If we are given a Resources object, unwrap the data + if hasattr(resources, "data"): + resources = resources.data + if hasattr(named_resources, "data"): + named_resources = named_resources.data + # Case 1: we have resources as a string and it's a member of named if isinstance(resources, str): if "|" in resources: diff --git a/jobspec/transformer/flux/__init__.py b/jobspec/transformer/flux/__init__.py index 54b4683..c6475e5 100644 --- a/jobspec/transformer/flux/__init__.py +++ b/jobspec/transformer/flux/__init__.py @@ -1 +1 @@ -from .transformer import Transformer +from .workload import FluxWorkload as Transformer diff --git a/jobspec/transformer/flux/steps.py b/jobspec/transformer/flux/steps.py index ed56371..41cb6c1 100644 --- a/jobspec/transformer/flux/steps.py +++ b/jobspec/transformer/flux/steps.py @@ -163,19 +163,23 @@ def write_tasks_script(self): data = script_prefix for task in self.tasks: if task.name == "batch": - print("TODO deal with nested batch") - import IPython - - IPython.embed() + cmd, _ = self.generate_command(waitable=True) + data.append(" ".join(cmd)) + # This is the jobspec + data.append("# rm -rf {cmd[-1]}") data.append(" ".join(task.generate_command(waitable=True))) # Ensure all jobs are waited on data.append("flux job wait --all") return {"mode": 33216, "data": "\n".join(data), "encoding": "utf-8"} - def run(self, *args, **kwargs): + def generate_command(self, waitable=False): """ - Run the batch step + Convenience function to generate the command. + + This is also intended for flux batch to use, + and we expect the last argument to be the temporary file + that needs to be cleaned up """ # With batch, we are going to cheat and run a flux job submit --dry-run # with the command that would normally be derived for flux batch. @@ -199,20 +203,33 @@ def run(self, *args, **kwargs): files["batch-script"] = self.write_tasks_script() js["attributes"]["system"]["files"] = files + # Write the jobspec to a temporary file, target for cleanup + tmpfile = utils.get_tmpfile(prefix="jobspec-") + utils.write_file(json.dumps(js), tmpfile) + # Prepare a result, we can show the batch script if debug is on result = Result() for line in files["batch-script"]["data"].split("\n"): result.add_debug_line(line) - # Write the jobspec to a temporary file, target for cleanup - tmpfile = utils.get_tmpfile(prefix="jobspec-") - utils.write_file(json.dumps(js), tmpfile) - # and submit with flux job submit # We can't really watch here, or do anything with attributes yet - cmd = ["flux", "job", "submit", tmpfile] + cmd = ["flux", "job", "submit"] + if waitable: + cmd += ["--flags=waitable"] + cmd.append(tmpfile) + return cmd, result + + def run(self, *args, **kwargs): + """ + Run the batch step + """ + cmd, result = self.generate_command() res = utils.run_command(cmd, check_output=True) + # The temporary file to cleanup is the last in the list + tmpfile = cmd[-1] + # Prepare a result to return result.out = res["message"].strip() result.add_debug_line(" ".join(cmd)) diff --git a/jobspec/transformer/flux/transformer.py b/jobspec/transformer/flux/transformer.py deleted file mode 100644 index 3bb7feb..0000000 --- a/jobspec/transformer/flux/transformer.py +++ /dev/null @@ -1,49 +0,0 @@ -import uuid - -import jobspec.core as js -import jobspec.core.resources as rcore -from jobspec.runner import TransformerBase - -from .steps import batch, stage, submit -from .workload import FluxWorkload - -# handle for steps to use -handle = None - - -class Transformer(TransformerBase): - """ - The flux transformer - """ - - # These metadata fields are required (and checked for) - name = "flux" - description = "Flux Framework transformer" - - def __init__(self, *args, **kwargs): - # Ensure we have a flux handle - global handle - import flux - - handle = flux.Flux() - super().__init__(*args, **kwargs) - - def parse(self, jobspec): - """ - Parse the jobspec into tasks for flux. - """ - # The transformer needs to share state between tasks and groups, so we create - # a shared class to do that. A "flux workload" is some number of tasks / batches - # It replaces the step abstractions to be run. Anoher workload manager that does - # not have the tight dependencies would not need to do this. - workload = FluxWorkload(jobspec) - workload.parse() - - # Return the workload to run - return [workload] - - -# A transformer can register shared steps, or custom steps -Transformer.register_step(batch) -Transformer.register_step(submit) -Transformer.register_step(stage) diff --git a/jobspec/transformer/flux/workload.py b/jobspec/transformer/flux/workload.py index f0f5aeb..39448fe 100644 --- a/jobspec/transformer/flux/workload.py +++ b/jobspec/transformer/flux/workload.py @@ -1,53 +1,36 @@ +import copy + import jobspec.core as js import jobspec.core.resources as rcore -import jobspec.steps.runner as step_runner -from jobspec.transformer.result import Result +from jobspec.runner import TransformerBase -from .steps import batch, submit +from .steps import batch, stage, submit -class FluxWorkload: +class FluxWorkload(TransformerBase): """ - The flux workload (batch and submit) holder + The flux workload (batch and submit) transformer + + The transformer needs to share state between tasks and groups, so we create + a shared class to do that. A "flux workload" is some number of tasks / batches + It replaces the step abstractions to be run. """ - name = "workload" + # These metadata fields are required (and checked for) + name = "flux" + description = "Flux Framework workload" - def __init__(self, jobspec): + def parse(self, jobspec): """ - Add the jobspec and set globals (resources, tasks, requires) + Parse the jobspec into tasks for flux. """ + # Reset the jobspec and groups and tasks self.js = jobspec self.group_lookup = {} # Top level, a-la-carte tasks (steps) - # TODO - we might need to decide on order for top level tasks vs. groups self.tasks = [] - @property - def resources(self): - """ - This returns a global resource lookup - """ - return self.js.get("resources", {}) - - @property - def requires(self): - return self.js.get("requires", {}) - - def run(self, *args, **kwargs): - """ - Run the steps of the workload, some number of submit/batch. - """ - print() - # This will be a combination of submit and batch - for step in self.tasks: - step_runner.run("flux", step) - - def parse(self): - """ - Parse the jobspec into tasks for flux. - """ # Parse top level groups into a lookup. Those that don't have a name # are given a name based on order, and assumed not to be linked to anything for i, group in enumerate(self.js.get("groups") or []): @@ -61,11 +44,27 @@ def parse(self): self.tasks = self.parse_tasks(tasks, self.resources, requires=self.requires) # Now parse remaining groups "a la carte" that aren't linked to top level tasks - for name, group in self.group_lookup.items(): - self.tasks.prepend( - self.parse_group(group, name, self.resources, requires=self.requires) + # We copy because otherwise the dict changes size when the task parser removes + groups = copy.deepcopy(self.group_lookup) + for name, group in groups.items(): + self.tasks.insert( + 0, self.parse_group(group, name, self.resources, requires=self.requires) ) + # Return the transformer to call run to + return self.tasks + + @property + def resources(self): + """ + This returns a global resource lookup + """ + return self.js.get("resources", {}) + + @property + def requires(self): + return self.js.get("requires", {}) + def parse_group(self, group, name, resources=None, requires=None, attributes=None): """ Parse a group and return a step. If tasks are within a group, @@ -94,7 +93,8 @@ def parse_group(self, group, name, resources=None, requires=None, attributes=Non name_prefix = f"{name}-" steps = self.parse_tasks( tasks, - resources=group_resources, + # Still provide global resources as a lookup + resources=resources, requires=group_requires.data, attributes=group_attributes, name_prefix=name_prefix, @@ -187,3 +187,9 @@ def parse_tasks(self, tasks, resources=None, attributes=None, requires=None, nam steps.append(new_step) return steps + + +# A transformer can register shared steps, or custom steps +FluxWorkload.register_step(batch) +FluxWorkload.register_step(submit) +FluxWorkload.register_step(stage) diff --git a/jobspec/utils.py b/jobspec/utils.py index 88fc191..26b657f 100644 --- a/jobspec/utils.py +++ b/jobspec/utils.py @@ -45,7 +45,7 @@ def get_tmpdir(tmpdir=None, prefix="", create=True): Get a temporary directory for an operation. """ tmpdir = tmpdir or tempfile.gettempdir() - prefix = prefix or "shpc-tmp" + prefix = prefix or "jobspec-" prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) tmpdir = os.path.join(tmpdir, prefix)