From 309d5b3993513bd09ccfc9a8bb950e4af7abe5be Mon Sep 17 00:00:00 2001 From: Vanessasaurus <814322+vsoch@users.noreply.github.com> Date: Wed, 12 Jun 2024 11:36:25 -0600 Subject: [PATCH] feat: support for environment (#18) * feat: support for environment * attributes are under task * add support for parsing script Signed-off-by: vsoch --- jobspec/transformer/flux/steps.py | 37 ++++++++++++++++++++++++++----- jobspec/utils.py | 8 +++---- jobspec/version.py | 2 +- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/jobspec/transformer/flux/steps.py b/jobspec/transformer/flux/steps.py index e2c1869..4e09b09 100644 --- a/jobspec/transformer/flux/steps.py +++ b/jobspec/transformer/flux/steps.py @@ -1,6 +1,8 @@ import copy import json import os +import re +import shlex import uuid import jobspec.utils as utils @@ -75,6 +77,21 @@ def cleanup(self, filename): if os.path.exists(filename): os.remove(filename) + def write_job_script(self, command): + """ + Given a bash, shell, or python command, write + into a script. + """ + command = command.strip() + match = re.match("#!/bin/(?Pbash|sh|python)", command) + terms = match.groupdict() + tmpfile = utils.get_tmpfile(prefix="jobscript-", suffix=".sh") + + # Clean self up (commented out because makes me nervous) + command += f"\n# rm -rf {tmpfile}" + utils.write_file(command, tmpfile) + return [terms["executable"], tmpfile] + def prepare(self, command=None, waitable=False): """ Return the command, without flux submit|batch @@ -83,10 +100,8 @@ def prepare(self, command=None, waitable=False): # We can get the resources from options resources = self.options.get("resources") - - # These aren't used yet - they need to go into flux - attributes = self.options.get("attributes") or {} task = self.options.get("task") or {} + attributes = task.get("attributes") or {} # This flattens to be what we ask flux for slot = resources.flatten_slot() @@ -100,6 +115,10 @@ def prepare(self, command=None, waitable=False): cwd = attributes.get("cwd") watch = attributes.get("watch") + # Environment + for key, value in attributes.get("environment", {}).items(): + cmd += [f"--env={key}={value}"] + # Note that you need to install our frobnicator plugin # for this to work. See the examples/depends_on directory for depends_on in task.get("depends_on") or []: @@ -135,8 +154,15 @@ def prepare(self, command=None, waitable=False): # Right now assume command is required if not command: command = task["command"] + + # Case 1: we are given a script to write + if isinstance(command, str) and re.search("#!/bin/(bash|sh|python)", command): + command = self.write_job_script(command) + + # String that should be a list if isinstance(command, str): - command = [command] + command = shlex.split(command) + cmd += command return cmd @@ -266,7 +292,8 @@ def run(self, *args, **kwargs): cmd = self.generate_command() # Are we watching? - attributes = self.options.get("attributes") or {} + task = self.options.get("task") or {} + attributes = task.get("attributes") or {} watch = attributes.get("watch") res = utils.run_command(cmd, check_output=True, stream=watch) diff --git a/jobspec/utils.py b/jobspec/utils.py index 674ff34..86c2eb9 100644 --- a/jobspec/utils.py +++ b/jobspec/utils.py @@ -40,7 +40,7 @@ def recursive_find(base, pattern="[.]py"): yield filepath -def get_tmpfile(tmpdir=None, prefix=""): +def get_tmpfile(tmpdir=None, prefix="", suffix=None): """ Get a temporary file with an optional prefix. """ @@ -51,7 +51,7 @@ def get_tmpfile(tmpdir=None, prefix=""): if tmpdir: prefix = os.path.join(tmpdir, os.path.basename(prefix)) - fd, tmp_file = tempfile.mkstemp(prefix=prefix) + fd, tmp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix) os.close(fd) return tmp_file @@ -62,7 +62,7 @@ def get_tmpdir(tmpdir=None, prefix="", create=True): Get a temporary directory for an operation. """ tmpdir = tmpdir or tempfile.gettempdir() - prefix = prefix or "jobspec-" + prefix = prefix or "jobspec" prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) tmpdir = os.path.join(tmpdir, prefix) @@ -120,7 +120,7 @@ def run_command(cmd, stream=False, check_output=False, return_code=0): If check_output is True, check against an expected return code. """ stdout = subprocess.PIPE if not stream else None - output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout) + output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout, env=os.environ.copy()) t = output.communicate()[0], output.returncode output = {"message": t[0], "return_code": t[1]} diff --git a/jobspec/version.py b/jobspec/version.py index 8ba3e98..f63fbd4 100644 --- a/jobspec/version.py +++ b/jobspec/version.py @@ -1,4 +1,4 @@ -__version__ = "0.1.14" +__version__ = "0.1.15" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "jobspec"