Skip to content

Commit

Permalink
feat: support for environment (#18)
Browse files Browse the repository at this point in the history
* feat: support for environment
* attributes are under task
* add support for parsing script

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored Jun 12, 2024
1 parent 6bccdf2 commit 309d5b3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
37 changes: 32 additions & 5 deletions jobspec/transformer/flux/steps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import json
import os
import re
import shlex
import uuid

import jobspec.utils as utils
Expand Down Expand Up @@ -75,6 +77,21 @@ def cleanup(self, filename):
if os.path.exists(filename):
os.remove(filename)

def write_job_script(self, command):
"""
Given a bash, shell, or python command, write
into a script.
"""
command = command.strip()
match = re.match("#!/bin/(?P<executable>bash|sh|python)", command)
terms = match.groupdict()
tmpfile = utils.get_tmpfile(prefix="jobscript-", suffix=".sh")

# Clean self up (commented out because makes me nervous)
command += f"\n# rm -rf {tmpfile}"
utils.write_file(command, tmpfile)
return [terms["executable"], tmpfile]

def prepare(self, command=None, waitable=False):
"""
Return the command, without flux submit|batch
Expand All @@ -83,10 +100,8 @@ def prepare(self, command=None, waitable=False):

# We can get the resources from options
resources = self.options.get("resources")

# These aren't used yet - they need to go into flux
attributes = self.options.get("attributes") or {}
task = self.options.get("task") or {}
attributes = task.get("attributes") or {}

# This flattens to be what we ask flux for
slot = resources.flatten_slot()
Expand All @@ -100,6 +115,10 @@ def prepare(self, command=None, waitable=False):
cwd = attributes.get("cwd")
watch = attributes.get("watch")

# Environment
for key, value in attributes.get("environment", {}).items():
cmd += [f"--env={key}={value}"]

# Note that you need to install our frobnicator plugin
# for this to work. See the examples/depends_on directory
for depends_on in task.get("depends_on") or []:
Expand Down Expand Up @@ -135,8 +154,15 @@ def prepare(self, command=None, waitable=False):
# Right now assume command is required
if not command:
command = task["command"]

# Case 1: we are given a script to write
if isinstance(command, str) and re.search("#!/bin/(bash|sh|python)", command):
command = self.write_job_script(command)

# String that should be a list
if isinstance(command, str):
command = [command]
command = shlex.split(command)

cmd += command
return cmd

Expand Down Expand Up @@ -266,7 +292,8 @@ def run(self, *args, **kwargs):
cmd = self.generate_command()

# Are we watching?
attributes = self.options.get("attributes") or {}
task = self.options.get("task") or {}
attributes = task.get("attributes") or {}
watch = attributes.get("watch")
res = utils.run_command(cmd, check_output=True, stream=watch)

Expand Down
8 changes: 4 additions & 4 deletions jobspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def recursive_find(base, pattern="[.]py"):
yield filepath


def get_tmpfile(tmpdir=None, prefix=""):
def get_tmpfile(tmpdir=None, prefix="", suffix=None):
"""
Get a temporary file with an optional prefix.
"""
Expand All @@ -51,7 +51,7 @@ def get_tmpfile(tmpdir=None, prefix=""):
if tmpdir:
prefix = os.path.join(tmpdir, os.path.basename(prefix))

fd, tmp_file = tempfile.mkstemp(prefix=prefix)
fd, tmp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)

return tmp_file
Expand All @@ -62,7 +62,7 @@ def get_tmpdir(tmpdir=None, prefix="", create=True):
Get a temporary directory for an operation.
"""
tmpdir = tmpdir or tempfile.gettempdir()
prefix = prefix or "jobspec-"
prefix = prefix or "jobspec"
prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names()))
tmpdir = os.path.join(tmpdir, prefix)

Expand Down Expand Up @@ -120,7 +120,7 @@ def run_command(cmd, stream=False, check_output=False, return_code=0):
If check_output is True, check against an expected return code.
"""
stdout = subprocess.PIPE if not stream else None
output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout)
output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout, env=os.environ.copy())
t = output.communicate()[0], output.returncode
output = {"message": t[0], "return_code": t[1]}

Expand Down
2 changes: 1 addition & 1 deletion jobspec/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.1.14"
__version__ = "0.1.15"
AUTHOR = "Vanessa Sochat"
AUTHOR_EMAIL = "[email protected]"
NAME = "jobspec"
Expand Down

0 comments on commit 309d5b3

Please sign in to comment.