Skip to content

Commit

Permalink
feat: support for flux workload
Browse files Browse the repository at this point in the history
Problem: a workload can be any number of submit/batch
Solution: create a workload abstraction that can handle
the nesting. We can write flux batch scripts as jobspecs
to submit with flux job submit, and we do this by doing
flux submit with --dry-run and then adding a faux command
to a batch-script in files that does not exist. Then we load
the jobspec, add the faux script (with the subtasks) and then
flux job submit <jobspec>. Next I need to work on the group
in group case (a flux batch that submits another flux batch)

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Apr 29, 2024
1 parent b74aa10 commit eeaa039
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 333 deletions.
3 changes: 2 additions & 1 deletion examples/task-with-group.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ groups:
- name: task-2
depends_on: ["task-1"]
resources: common
command:
tasks:
- command:
- bash
- -c
- "echo Starting task 2; sleep 3; echo Finishing task 2"
Expand Down
4 changes: 2 additions & 2 deletions jobspec/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def validate(self):
"""
jsonschema.validate(self.data, self.schema)

# Require at least one of command or steps
# Require at least one of command or steps, unless it is a group
for task in self.data.get("tasks", []):
if "command" not in task and "steps" not in task:
if "group" not in task and ("command" not in task and "steps" not in task):
raise ValueError("Jobspec is not valid, each task must have a command or steps")


Expand Down
2 changes: 1 addition & 1 deletion jobspec/core/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def parse_resource_subset(named_resources, resources):
raise ValueError("Asking for an AND in resources is not supported yet.")
if resources not in named_resources:
raise ValueError(f"Asked for resources '{resources}' that are not known")
return named_resources["resources"]
return named_resources[resources]

# Case 2: It's just it's own thing
return resources
22 changes: 2 additions & 20 deletions jobspec/runner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os
import sys

# This imports the latest version
import jobspec.core as js
import jobspec.defaults as defaults
import jobspec.logger as logger
from jobspec.logger import LogColors
import jobspec.steps.runner as step_runner


class TransformerBase:
Expand Down Expand Up @@ -79,23 +77,7 @@ def run(self, filename):

# Run each step to submit the job, and that's it.
for step in steps:
self.run_step(step)

def run_step(self, step):
"""
Run a single step. Make it pretty.
"""
prefix = f"{self.name} {step.name}".ljust(15)
print(f"=> {LogColors.OKCYAN}{prefix}{LogColors.ENDC}", end="")
try:
result = step.run()
print(
f"{LogColors.OKBLUE}{result.out}{LogColors.ENDC} {LogColors.OKGREEN}OK{LogColors.ENDC}"
)
result.print_extra()
except Exception as e:
print(f"\n{LogColors.RED}{str(e)}{LogColors.ENDC}")
sys.exit()
step_runner.run(self.name, step)

def load_jobspec(self, filename):
"""
Expand Down
1 change: 1 addition & 0 deletions jobspec/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
# Tasks for the group
"tasks": {"$ref": "#definitions/tasks"},
},
"additionalProperties": False,
},
"intranode_resource_vertex": {
"description": "schema for resource vertices within a node, cannot have child vertices",
Expand Down
11 changes: 10 additions & 1 deletion jobspec/steps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def __init__(self, js, **kwargs):
# Shared validation
self._validate()

# Custom validation
# Custom validation and setup
self.validate()
self.setup(**kwargs)

def _validate(self):
"""
Expand All @@ -41,6 +42,14 @@ def validate(self):
"""
pass

def setup(self, **kwargs):
"""
Custom setup
Akin to overriding init, but easier to write.
"""
pass

def run(self, *args, **kwargs):
"""
Run a step.
Expand Down
22 changes: 22 additions & 0 deletions jobspec/steps/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sys

from jobspec.logger import LogColors


def run(name, step):
"""
Run a single step. Make it pretty.
"""
prefix = f"{name} {step.name}".ljust(15)
print(f"=> {LogColors.OKCYAN}{prefix}{LogColors.ENDC}", end="")
try:
result = step.run()
if not result:
return
print(
f"{LogColors.OKBLUE}{result.out}{LogColors.ENDC} {LogColors.OKGREEN}OK{LogColors.ENDC}"
)
result.print_extra()
except Exception as e:
print(f"\n{LogColors.RED}{str(e)}{LogColors.ENDC}")
sys.exit()
2 changes: 1 addition & 1 deletion jobspec/transformer/flux/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .transformer import Transformer
from .transformer import Transformer
257 changes: 257 additions & 0 deletions jobspec/transformer/flux/steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import json
import os
import uuid

import jobspec.utils as utils
from jobspec.steps.base import StepBase
from jobspec.transformer.result import Result

script_prefix = ["#!/bin/bash"]

# Custom Flux steps - just write and register!


class stage(StepBase):
"""
A copy step uses flux filemap to stage across nodes
This assumes we don't have a shared filesystem. It is skipped if we do.
"""

name = "stage"

def run(self, stage, *args, **kwargs):
"""
Run the stage step = fall back to filename for now
"""
# If we have a sharedfs, return early, the write will have written it there
sharedfs = self.options.get("settings", {}).get("sharedfs") is True
if sharedfs:
return

# Sanity check staging directory exists across nodes
cmd = ["flux", "exec", "-r", "all", "-x", "0", "mkdir", "-p", stage]
utils.run_command(cmd, check_output=True)

name = str(uuid.uuid4())
filename = self.options["filename"]
cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename]
utils.run_command(cmd, check_output=True)

# Assume we send to all ranks besides where we've already written it
# This will likely fail if the filesystem is shared
cmd = [
"flux",
"exec",
"--dir",
stage,
"-r",
"all",
"-x",
"0",
"flux",
"filemap",
"get",
"--tags",
name,
]
utils.run_command(cmd, check_output=False)

# Unmap to clear the memory map
cmd = ["flux", "filemap", "unmap", "--tags", name]
utils.run_command(cmd, check_output=True)


class JobBase(StepBase):
"""
base with shared logic for submit or batch
"""

def cleanup(self, filename):
"""
Cleanup a filename if it still exists
"""
if os.path.exists(filename):
os.remove(filename)

def prepare(self, command=None, waitable=False):
"""
Return the command, without flux submit|batch
"""
cmd = []

# We can get the resources from options
resources = self.options.get("resources")

# These aren't used yet - they need to go into flux
attributes = self.options.get("attributes") or {}
task = self.options.get("task") or {}

# This flattens to be what we ask flux for
slot = resources.flatten_slot()
nodes = slot.get("node")
tasks = slot.get("core")

# Get name, jobspec, depends, etc
name = self.options.get("name")
duration = attributes.get("duration")
cwd = attributes.get("cwd")
watch = attributes.get("watch")

# We can't support this yet because it needs the jobid
# That design to require to get it seems fragile
# for depends_on in task.get("depends_on") or []:
# cmd += [f"--dependency={depends_on}"]

if cwd is not None:
cmd += ["--cwd", cwd]
if name is not None:
cmd += ["--job-name", name]
if duration is not None:
cmd += ["--time-limit", str(duration)]
if watch is True:
cmd += ["--watch"]

if nodes:
cmd += ["-N", str(nodes)]
if tasks:
cmd += ["-n", str(tasks)]

# Replicas we do with cc
replicas = task.get("replicas")
if replicas:
replicas -= 1
cmd += ["--cc", f"0-{replicas}"]

# Are we adding a waitable flag? This will prevent the batch
# from exiting
if waitable:
cmd += ["--flags=waitable"]

# Right now assume command is required
if not command:
command = task["command"]
if isinstance(command, str):
command = [command]
cmd += command
return cmd


class batch(JobBase):
"""
flux batch is the flux implementation of a group.
It starts an allocation to run commands and launch
other tasks or groups.
"""

name = "batch"

def setup(self, **kwargs):
"""
Custom setup
A batch can hold one or more task steps, and these need to be written into
a custom script (and not run as tasks)
"""
self.tasks = []

def write_tasks_script(self):
"""
Generate the batch script.
"""
data = script_prefix
for task in self.tasks:
if task.name == "batch":
print("TODO deal with nested batch")
import IPython

IPython.embed()
data.append(" ".join(task.generate_command(waitable=True)))

# Ensure all jobs are waited on
data.append("flux job wait --all")
return {"mode": 33216, "data": "\n".join(data), "encoding": "utf-8"}

def run(self, *args, **kwargs):
"""
Run the batch step
"""
# With batch, we are going to cheat and run a flux job submit --dry-run
# with the command that would normally be derived for flux batch.
command = ["flux", "broker", "{{tmpdir}}/batch-script"]
cmd = self.prepare(command)

# This returns a partial jobspec for the work that is needed...
cmd = ["flux", "submit", "--dry-run"] + cmd
res = utils.run_command(cmd, check_output=True)

if res["return_code"] != 0:
raise ValueError(f"Issue generating flux jobspec: {res['message']}")

# Then we will add our (not written) batch script to the files section
# Load the jobspec and add our "files" section to it with tasks (flux submit, etc)
js = json.loads(res["message"])

# Be careful about updating files - not sure if there are cases
# when there might be other content there.
files = js["attributes"]["system"].get("files") or {}
files["batch-script"] = self.write_tasks_script()
js["attributes"]["system"]["files"] = files

# Prepare a result, we can show the batch script if debug is on
result = Result()
for line in files["batch-script"]["data"].split("\n"):
result.add_debug_line(line)

# Write the jobspec to a temporary file, target for cleanup
tmpfile = utils.get_tmpfile(prefix="jobspec-")
utils.write_file(json.dumps(js), tmpfile)

# and submit with flux job submit <jobspec>
# We can't really watch here, or do anything with attributes yet
cmd = ["flux", "job", "submit", tmpfile]
res = utils.run_command(cmd, check_output=True)

# Prepare a result to return
result.out = res["message"].strip()
result.add_debug_line(" ".join(cmd))

# Cleanup the files
self.cleanup(tmpfile)
return result


class submit(JobBase):
name = "submit"

def generate_command(self, waitable=False):
"""
Convenience function to generate the command.
This is intended for flux batch to use
"""
cmd = self.prepare(waitable=waitable)
return ["flux", "submit"] + cmd

def run(self, *args, **kwargs):
"""
Run the submit step.
The python bindings are giving me weird errors.
"""
cmd = self.generate_command()

# Are we watching?
attributes = self.options.get("attributes") or {}
watch = attributes.get("watch")
res = utils.run_command(cmd, check_output=True, stream=watch)

# Prepare a result to return
result = Result()

# Return results to print
if not watch:
result.out = res["message"].strip()
result.add_debug_line(" ".join(cmd))
return result
Loading

0 comments on commit eeaa039

Please sign in to comment.