Skip to content

Commit

Permalink
wip: save state
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Apr 29, 2024
1 parent eeaa039 commit 3ff140f
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 100 deletions.
1 change: 1 addition & 0 deletions jobspec/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ def update(self, requires):

# If we have the group, update on the level of fields
self.data[group].update(fields)
return self
7 changes: 6 additions & 1 deletion jobspec/core/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def find_resources(flat, resource, slot, last_one=False):
# More traversing...
if "with" in resource:
for r in resource["with"]:
print(r)
find_resources(flat, r, slot, last_one)
return flat

Expand All @@ -31,6 +30,12 @@ def parse_resource_subset(named_resources, resources):
the batch has won't be satisfied. But if this is a grow/autoscale
setup, maybe it eventually could be, so we allow it.
"""
# If we are given a Resources object, unwrap the data
if hasattr(resources, "data"):
resources = resources.data
if hasattr(named_resources, "data"):
named_resources = named_resources.data

# Case 1: we have resources as a string and it's a member of named
if isinstance(resources, str):
if "|" in resources:
Expand Down
2 changes: 1 addition & 1 deletion jobspec/transformer/flux/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .transformer import Transformer
from .workload import FluxWorkload as Transformer
39 changes: 28 additions & 11 deletions jobspec/transformer/flux/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,23 @@ def write_tasks_script(self):
data = script_prefix
for task in self.tasks:
if task.name == "batch":
print("TODO deal with nested batch")
import IPython

IPython.embed()
cmd, _ = self.generate_command(waitable=True)
data.append(" ".join(cmd))
# This is the jobspec
data.append("# rm -rf {cmd[-1]}")
data.append(" ".join(task.generate_command(waitable=True)))

# Ensure all jobs are waited on
data.append("flux job wait --all")
return {"mode": 33216, "data": "\n".join(data), "encoding": "utf-8"}

def run(self, *args, **kwargs):
def generate_command(self, waitable=False):
"""
Run the batch step
Convenience function to generate the command.
This is also intended for flux batch to use,
and we expect the last argument to be the temporary file
that needs to be cleaned up
"""
# With batch, we are going to cheat and run a flux job submit --dry-run
# with the command that would normally be derived for flux batch.
Expand All @@ -199,20 +203,33 @@ def run(self, *args, **kwargs):
files["batch-script"] = self.write_tasks_script()
js["attributes"]["system"]["files"] = files

# Write the jobspec to a temporary file, target for cleanup
tmpfile = utils.get_tmpfile(prefix="jobspec-")
utils.write_file(json.dumps(js), tmpfile)

# Prepare a result, we can show the batch script if debug is on
result = Result()
for line in files["batch-script"]["data"].split("\n"):
result.add_debug_line(line)

# Write the jobspec to a temporary file, target for cleanup
tmpfile = utils.get_tmpfile(prefix="jobspec-")
utils.write_file(json.dumps(js), tmpfile)

# and submit with flux job submit <jobspec>
# We can't really watch here, or do anything with attributes yet
cmd = ["flux", "job", "submit", tmpfile]
cmd = ["flux", "job", "submit"]
if waitable:
cmd += ["--flags=waitable"]
cmd.append(tmpfile)
return cmd, result

def run(self, *args, **kwargs):
"""
Run the batch step
"""
cmd, result = self.generate_command()
res = utils.run_command(cmd, check_output=True)

# The temporary file to cleanup is the last in the list
tmpfile = cmd[-1]

# Prepare a result to return
result.out = res["message"].strip()
result.add_debug_line(" ".join(cmd))
Expand Down
49 changes: 0 additions & 49 deletions jobspec/transformer/flux/transformer.py

This file was deleted.

80 changes: 43 additions & 37 deletions jobspec/transformer/flux/workload.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,36 @@
import copy

import jobspec.core as js
import jobspec.core.resources as rcore
import jobspec.steps.runner as step_runner
from jobspec.transformer.result import Result
from jobspec.runner import TransformerBase

from .steps import batch, submit
from .steps import batch, stage, submit


class FluxWorkload:
class FluxWorkload(TransformerBase):
"""
The flux workload (batch and submit) holder
The flux workload (batch and submit) transformer
The transformer needs to share state between tasks and groups, so we create
a shared class to do that. A "flux workload" is some number of tasks / batches
It replaces the step abstractions to be run.
"""

name = "workload"
# These metadata fields are required (and checked for)
name = "flux"
description = "Flux Framework workload"

def __init__(self, jobspec):
def parse(self, jobspec):
"""
Add the jobspec and set globals (resources, tasks, requires)
Parse the jobspec into tasks for flux.
"""
# Reset the jobspec and groups and tasks
self.js = jobspec
self.group_lookup = {}

# Top level, a-la-carte tasks (steps)
# TODO - we might need to decide on order for top level tasks vs. groups
self.tasks = []

@property
def resources(self):
"""
This returns a global resource lookup
"""
return self.js.get("resources", {})

@property
def requires(self):
return self.js.get("requires", {})

def run(self, *args, **kwargs):
"""
Run the steps of the workload, some number of submit/batch.
"""
print()
# This will be a combination of submit and batch
for step in self.tasks:
step_runner.run("flux", step)

def parse(self):
"""
Parse the jobspec into tasks for flux.
"""
# Parse top level groups into a lookup. Those that don't have a name
# are given a name based on order, and assumed not to be linked to anything
for i, group in enumerate(self.js.get("groups") or []):
Expand All @@ -61,11 +44,27 @@ def parse(self):
self.tasks = self.parse_tasks(tasks, self.resources, requires=self.requires)

# Now parse remaining groups "a la carte" that aren't linked to top level tasks
for name, group in self.group_lookup.items():
self.tasks.prepend(
self.parse_group(group, name, self.resources, requires=self.requires)
# We copy because otherwise the dict changes size when the task parser removes
groups = copy.deepcopy(self.group_lookup)
for name, group in groups.items():
self.tasks.insert(
0, self.parse_group(group, name, self.resources, requires=self.requires)
)

# Return the transformer to call run to
return self.tasks

@property
def resources(self):
"""
This returns a global resource lookup
"""
return self.js.get("resources", {})

@property
def requires(self):
return self.js.get("requires", {})

def parse_group(self, group, name, resources=None, requires=None, attributes=None):
"""
Parse a group and return a step. If tasks are within a group,
Expand Down Expand Up @@ -94,7 +93,8 @@ def parse_group(self, group, name, resources=None, requires=None, attributes=Non
name_prefix = f"{name}-"
steps = self.parse_tasks(
tasks,
resources=group_resources,
# Still provide global resources as a lookup
resources=resources,
requires=group_requires.data,
attributes=group_attributes,
name_prefix=name_prefix,
Expand Down Expand Up @@ -187,3 +187,9 @@ def parse_tasks(self, tasks, resources=None, attributes=None, requires=None, nam
steps.append(new_step)

return steps


# A transformer can register shared steps, or custom steps
FluxWorkload.register_step(batch)
FluxWorkload.register_step(submit)
FluxWorkload.register_step(stage)
2 changes: 1 addition & 1 deletion jobspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_tmpdir(tmpdir=None, prefix="", create=True):
Get a temporary directory for an operation.
"""
tmpdir = tmpdir or tempfile.gettempdir()
prefix = prefix or "shpc-tmp"
prefix = prefix or "jobspec-"
prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names()))
tmpdir = os.path.join(tmpdir, prefix)

Expand Down

0 comments on commit 3ff140f

Please sign in to comment.