From 7b1fec3ee0dfdd4b82a6bad6bf6db70be7efc404 Mon Sep 17 00:00:00 2001 From: Toby Jennings Date: Wed, 22 Jan 2025 09:42:20 -0600 Subject: [PATCH] WIP: allocator work --- python/lsst/ctrl/execute/allocator.py | 38 ++++++-- python/lsst/ctrl/execute/allocatorParser.py | 11 ++- python/lsst/ctrl/execute/envString.py | 26 ++++-- python/lsst/ctrl/execute/findPackageFile.py | 75 ++++++++++++++++ .../ctrl/execute/libexec/allocateNodes.py | 19 ++-- python/lsst/ctrl/execute/slurmPlugin.py | 86 +++++++------------ tests/test_findPackageFile.py | 30 +++++++ 7 files changed, 198 insertions(+), 87 deletions(-) create mode 100644 python/lsst/ctrl/execute/findPackageFile.py create mode 100644 tests/test_findPackageFile.py diff --git a/python/lsst/ctrl/execute/allocator.py b/python/lsst/ctrl/execute/allocator.py index 6d9c2f9..8d576ac 100644 --- a/python/lsst/ctrl/execute/allocator.py +++ b/python/lsst/ctrl/execute/allocator.py @@ -33,6 +33,7 @@ from lsst.ctrl.execute.allocationConfig import AllocationConfig from lsst.ctrl.execute.condorInfoConfig import CondorInfoConfig from lsst.ctrl.execute.templateWriter import TemplateWriter +from lsst.resources import ResourcePath _LOG = logging.getLogger(__name__) @@ -48,11 +49,18 @@ class Allocator: the name of the platform to execute on opts : `Config` Config object containing options - condorInfoFileName : `str` + condorInfoFileName : `str | lsst.resources.ResourcePath` Name of the file containing Config information + + Raises + ------ + TypeError + If the condorInfoFileName is the wrong type. """ - def __init__(self, platform, opts, configuration, condorInfoFileName): + def __init__( + self, platform: str, opts, configuration, condorInfoFileName: str | ResourcePath + ): """Constructor @param platform: target platform for PBS submission @param opts: options to override @@ -61,9 +69,14 @@ def __init__(self, platform, opts, configuration, condorInfoFileName): self.defaults = {} self.configuration = configuration - fileName = envString.resolve(condorInfoFileName) condorInfoConfig = CondorInfoConfig() - condorInfoConfig.load(fileName) + if isinstance(condorInfoFileName, str): + fileName = envString.resolve(condorInfoFileName) + condorInfoConfig.load(fileName) + elif isinstance(condorInfoFileName, ResourcePath): + condorInfoConfig.loadFromStream(condorInfoFileName.read()) + else: + raise TypeError("Wrong type of condor info file provided to allocator.") self.platform = platform @@ -152,15 +165,18 @@ def load(self): ) self.defaults["SCHEDULER"] = self.configuration.platform.scheduler - def loadAllocationConfig(self, name, suffix): + def loadAllocationConfig(self, name: str | ResourcePath, suffix): """Loads all values from allocationConfig and command line overrides into data structures suitable for use by the TemplateWriter object. """ - resolvedName = envString.resolve(name) allocationConfig = AllocationConfig() - if not os.path.exists(resolvedName): - raise RuntimeError("%s was not found." % resolvedName) - allocationConfig.load(resolvedName) + if isinstance(name, str): + resolvedName = envString.resolve(name) + if not os.path.exists(resolvedName): + raise RuntimeError("%s was not found." % resolvedName) + allocationConfig.load(resolvedName) + elif isinstance(name, ResourcePath): + allocationConfig.loadFromStream(name.read()) self.defaults["QUEUE"] = allocationConfig.platform.queue self.defaults["EMAIL_NOTIFICATION"] = allocationConfig.platform.email @@ -460,3 +476,7 @@ def runCommand(self, cmd, verbose): # high order bits are status, low order bits are signal. exitCode = (status & 0xFF00) >> 8 return exitCode + + def submit(self): + """Submit the glidein jobs to the Batch system.""" + raise NotImplementedError diff --git a/python/lsst/ctrl/execute/allocatorParser.py b/python/lsst/ctrl/execute/allocatorParser.py index fdb9a8a..ba2eb3d 100644 --- a/python/lsst/ctrl/execute/allocatorParser.py +++ b/python/lsst/ctrl/execute/allocatorParser.py @@ -41,12 +41,9 @@ def __init__(self, basename): """ self.defaults = {} - - self.args = [] - self.args = self.parseArgs(basename) - def parseArgs(self, basename): + def parseArgs(self, basename) -> argparse.Namespace: """Parse command line, and test for required arguments Parameters @@ -60,7 +57,9 @@ def parseArgs(self, basename): """ parser = argparse.ArgumentParser(prog=basename) - parser.add_argument("platform", help="node allocation platform") + parser.add_argument( + "platform", type=str, default="s3df", help="node allocation platform" + ) parser.add_argument( "--auto", action="store_true", @@ -214,7 +213,7 @@ def getArgs(self): Returns ------- - args: `list` + args: `argparse.Namespace` remaining command line arguments """ return self.args diff --git a/python/lsst/ctrl/execute/envString.py b/python/lsst/ctrl/execute/envString.py index 568d522..3dbae8b 100644 --- a/python/lsst/ctrl/execute/envString.py +++ b/python/lsst/ctrl/execute/envString.py @@ -24,23 +24,33 @@ import os import re -import sys -# Given a string, look for any $ prefixed word, attempt to substitute -# an environment variable with that name. -# @throw exception if the environment variable doesn't exist -# @return the resulting string +def resolve(input: str) -> str: + """Render a string with any `$`-prefixed words substituted with a matching + environment variable. -def resolve(strVal): + FIXME: this reimplements the `os.path.expandvars()` function with the + exception of raising errors on unresolved variables. + + Parameters + ---------- + input : str | lsst.resources.ResourcePath + The string or object that can be cast as a string containing + environment variables to resolve. + + Raises + ------ + RuntimeError + If the environment variable does not exist + """ p = re.compile(r"\$[a-zA-Z0-9_]+") - retVal = strVal + retVal = input exprs = p.findall(retVal) for i in exprs: var = i[1:] val = os.getenv(var, None) if val is None: raise RuntimeError("couldn't find environment variable " + i) - sys.exit(120) retVal = p.sub(val, retVal, 1) return retVal diff --git a/python/lsst/ctrl/execute/findPackageFile.py b/python/lsst/ctrl/execute/findPackageFile.py new file mode 100644 index 0000000..04d582e --- /dev/null +++ b/python/lsst/ctrl/execute/findPackageFile.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python + +# +# LSST Data Management System +# Copyright 2008-2016 LSST Corporation. +# +# This product includes software developed by the +# LSST Project (http://www.lsst.org/). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the LSST License Statement and +# the GNU General Public License along with this program. If not, +# see . +# +import os + +import lsst.utils +from lsst.ctrl.execute.envString import resolve +from lsst.resources import ResourcePath + + +def find_package_file( + filename: str, kind: str = "config", platform: str = "s3df" +) -> ResourcePath: + """Find a package file from a set of candidate locations. + + The candidate locations are, in descending order of preference: + - An `.lsst` directory in the user's home directory. + - An `lsst` directory in the user's `$XDG_CONFIG_HOME` directory + - An `etc/{kind}` directory in the stack environment for the platform + - An `etc/{kind}` directory in an installed `lsst.ctrl.platform.*` package + - An `etc/{kind}` directory in the `lsst.ctrl.execute` package. + + Raises + ------ + IndexError + If a requested file object cannot be located in the candidate hierarchy + """ + _filename = resolve(filename) + home_dir = os.getenv("HOME", "/") + xdg_config_home = os.getenv("XDG_CONFIG_HOME", f"{home_dir}/.config") + try: + platform_pkg_dir = lsst.utils.getPackageDir(f"ctrl_platform_{platform}") + except (LookupError, ValueError): + platform_pkg_dir = None + + file_candidates = [ + ResourcePath(f"file://{home_dir}/.lsst/{_filename}"), + ResourcePath(f"file://{xdg_config_home}/lsst/{_filename}"), + ( + ResourcePath(f"file://{platform_pkg_dir}/etc/{kind}/{_filename}") + if platform_pkg_dir + else None + ), + ResourcePath( + f"resource://lsst.ctrl.platform.{platform}/etc/{kind}/{_filename}" + ), + ResourcePath(f"resource://lsst.ctrl.execute/etc/{kind}/{_filename}"), + ] + try: + found_file: ResourcePath = [ + c for c in file_candidates if c is not None and c.exists() + ][0] + except IndexError: + raise + return found_file diff --git a/python/lsst/ctrl/execute/libexec/allocateNodes.py b/python/lsst/ctrl/execute/libexec/allocateNodes.py index 5443edc..a3022c4 100755 --- a/python/lsst/ctrl/execute/libexec/allocateNodes.py +++ b/python/lsst/ctrl/execute/libexec/allocateNodes.py @@ -23,14 +23,13 @@ # import logging -import os import sys from typing import Any -import lsst.utils -from lsst.ctrl.execute import envString +from lsst.ctrl.execute.allocator import Allocator from lsst.ctrl.execute.allocatorParser import AllocatorParser from lsst.ctrl.execute.condorConfig import CondorConfig +from lsst.ctrl.execute.findPackageFile import find_package_file from lsst.ctrl.execute.namedClassFactory import NamedClassFactory _LOG = logging.getLogger("lsst.ctrl.execute") @@ -70,12 +69,9 @@ def main(): platform = p.getPlatform() # load the CondorConfig file - platformPkgDir = lsst.utils.getPackageDir("ctrl_platform_" + platform) - execConfigName = os.path.join(platformPkgDir, "etc", "config", "execConfig.py") - - resolvedName = envString.resolve(execConfigName) + execConfigName = find_package_file("execConfig.py", platform=platform) configuration = CondorConfig() - configuration.load(resolvedName) + configuration.loadFromStream(execConfigName.read()) # create the plugin class schedulerName = configuration.platform.scheduler @@ -84,12 +80,13 @@ def main(): ) # create the plugin - scheduler = schedulerClass( - platform, p.getArgs(), configuration, "$HOME/.lsst/condor-info.py" + condor_info_file = find_package_file("condor-info.py", platform=platform) + scheduler: Allocator = schedulerClass( + platform, p.getArgs(), configuration, condor_info_file ) # submit the request - scheduler.submit(platform, platformPkgDir) + scheduler.submit() if __name__ == "__main__": diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 324e289..69f1300 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -27,11 +27,14 @@ import subprocess import sys import time +from pathlib import Path from string import Template import htcondor from lsst.ctrl.bps.htcondor import condor_q from lsst.ctrl.execute.allocator import Allocator +from lsst.ctrl.execute.findPackageFile import find_package_file +from lsst.resources import ResourcePath _LOG = logging.getLogger(__name__) @@ -99,14 +102,9 @@ def countRunningSlurmJobs(jobname): numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="R") return numberOfJobs - def createFilesFromTemplates(self, platformPkgDir): + def createFilesFromTemplates(self): """Create the Slurm submit, script, and htcondor config files - Parameters - ---------- - platformPkgDir : `str` - path to the ctrl_platform package being used - Returns ------- generatedSlurmFile : `str` @@ -118,20 +116,20 @@ def createFilesFromTemplates(self, platformPkgDir): template.substitute(USER_HOME=self.getUserHome()) # create the slurm submit file - slurmName = os.path.join( - platformPkgDir, "etc", "templates", "generic.slurm.template" + slurmName = find_package_file( + "generic.slurm.template", kind="templates", platform=self.platform ) generatedSlurmFile = self.createSubmitFile(slurmName) # create the condor configuration file - condorFile = os.path.join( - platformPkgDir, "etc", "templates", "glidein_condor_config.template" + condorFile = find_package_file( + "glidein_condor_config.template", kind="templates", platform=self.platform ) self.createCondorConfigFile(condorFile) # create the script that the slurm submit file calls - allocationName = os.path.join( - platformPkgDir, "etc", "templates", "allocation.sh.template" + allocationName = find_package_file( + "allocation.sh.template", kind="templates", platform=self.platform ) self.createAllocationFile(allocationName) @@ -139,19 +137,11 @@ def createFilesFromTemplates(self, platformPkgDir): return generatedSlurmFile - def submit(self, platform, platformPkgDir): - """Submit the glidein jobs to the Batch system - - Parameters - ---------- - platform : `str` - name of the target compute platform - platformPkgDir : `str` - path to the ctrl_platform package being used - """ - configName = os.path.join(platformPkgDir, "etc", "config", "slurmConfig.py") + def submit(self): + """Submit the glidein jobs to the Batch system.""" + configName = find_package_file("slurmConfig.py", platform=self.platform) - self.loadSlurm(configName, platformPkgDir) + self.loadSlurm(configName) verbose = self.isVerbose() auto = self.isAuto() @@ -161,12 +151,10 @@ def submit(self, platform, platformPkgDir): # run the sbatch command template = Template(self.getLocalScratchDirectory()) - localScratchDir = template.substitute(USER_SCRATCH=self.getUserScratch()) - slurmSubmitDir = os.path.join(localScratchDir, self.defaults["DATE_STRING"]) - if not os.path.exists(localScratchDir): - os.mkdir(localScratchDir) - if not os.path.exists(slurmSubmitDir): - os.mkdir(slurmSubmitDir) + localScratchDir = Path(template.substitute(USER_SCRATCH=self.getUserScratch())) + slurmSubmitDir = localScratchDir / self.defaults["DATE_STRING"] + localScratchDir.mkdir(exist_ok=True) + slurmSubmitDir.mkdir(exist_ok=True) os.chdir(slurmSubmitDir) _LOG.debug( "The working local scratch directory localScratchDir is %s ", @@ -180,9 +168,9 @@ def submit(self, platform, platformPkgDir): _LOG.debug("The user home directory is %s", self.getUserHome()) if auto: - self.glideinsFromJobPressure(platformPkgDir) + self.glideinsFromJobPressure() else: - generatedSlurmFile = self.createFilesFromTemplates(platformPkgDir) + generatedSlurmFile = self.createFilesFromTemplates() cmd = "sbatch --mem %s %s" % (totalMemory, generatedSlurmFile) nodes = self.getNodes() # In this case 'nodes' is the Target. @@ -225,7 +213,7 @@ def submit(self, platform, platformPkgDir): _LOG.error("error running %s", cmd) sys.exit(exitCode) - def loadSlurm(self, name, platformPkgDir): + def loadSlurm(self, name): if self.opts.reservation is not None: self.defaults["RESERVATION"] = ( "#SBATCH --reservation %s" % self.opts.reservation @@ -244,12 +232,10 @@ def loadSlurm(self, name, platformPkgDir): scratchDir = template.substitute(USER_SCRATCH=self.getUserScratch()) self.defaults["SCRATCH_DIR"] = scratchDir - self.allocationFileName = os.path.join( - self.configDir, "allocation_%s.sh" % self.uniqueIdentifier - ) - self.defaults["GENERATED_ALLOCATE_SCRIPT"] = os.path.basename( - self.allocationFileName + self.allocationFileName = ( + Path(self.configDir) / f"allocation_{self.uniqueIdentifier}.sh" ) + self.defaults["GENERATED_ALLOCATE_SCRIPT"] = self.allocationFileName.name if self.opts.openfiles is None: self.defaults["OPEN_FILES"] = 20480 @@ -266,22 +252,22 @@ def loadSlurm(self, name, platformPkgDir): self.defaults["PACK_BLOCK"] = "Rank = TotalCpus - Cpus" # handle dynamic slot block template: - # 1) if it isn't specified, just put a comment in it's place + # 1) if it isn't specified, just put a comment in its place # 2) if it's specified, but without a filename, use the default # 3) if it's specified with a filename, use that. - dynamicSlotsName = None if self.opts.dynamic is None: self.defaults["DYNAMIC_SLOTS_BLOCK"] = "#" return + dynamicSlotsName: Path | ResourcePath if self.opts.dynamic == "__default__": - dynamicSlotsName = os.path.join( - platformPkgDir, "etc", "templates", "dynamic_slots.template" + dynamicSlotsName = find_package_file( + "dynamic_slots.template", kind="templates", platform=self.platform ) else: - dynamicSlotsName = self.opts.dynamic + dynamicSlotsName = Path(self.opts.dynamic) - with open(dynamicSlotsName) as f: + with dynamicSlotsName.open() as f: lines = f.readlines() block = "" for line in lines: @@ -301,14 +287,8 @@ def createAllocationFile(self, input): os.chmod(outfile, 0o755) return outfile - def glideinsFromJobPressure(self, platformPkgDir): - """Determine and submit the glideins needed from job pressure - - Parameters - ---------- - platformPkgDir : `str` - path to the ctrl_platform package being used - """ + def glideinsFromJobPressure(self): + """Determine and submit the glideins needed from job pressure.""" verbose = self.isVerbose() autoCPUs = self.getAutoCPUs() @@ -354,7 +334,7 @@ def glideinsFromJobPressure(self, platformPkgDir): _LOG.info("Auto: No HTCondor Jobs detected.") return - generatedSlurmFile = self.createFilesFromTemplates(platformPkgDir) + generatedSlurmFile = self.createFilesFromTemplates() condorq_large = [] condorq_small = [] schedd_name, condorq_full = condorq_data.popitem() diff --git a/tests/test_findPackageFile.py b/tests/test_findPackageFile.py new file mode 100644 index 0000000..06e4dd9 --- /dev/null +++ b/tests/test_findPackageFile.py @@ -0,0 +1,30 @@ +import pytest +from lsst.ctrl.execute.findPackageFile import find_package_file + + +def test_find_package_file(tmp_path, monkeypatch): + mock_home = tmp_path / "home" / "pytest" + (mock_home / ".lsst").mkdir(parents=True) + (mock_home / ".config" / "lsst").mkdir(parents=True) + + mock_lsst_file = mock_home / ".lsst" / "test_file_1.py" + mock_xdg_file = mock_home / ".config" / "lsst" / "test_file_2.py" + + mock_lsst_file.touch() + mock_xdg_file.touch() + + monkeypatch.setenv("HOME", str(mock_home)) + monkeypatch.setenv("XDG_CONFIG_HOME", str(mock_home / ".config")) + + f1 = find_package_file("test_file_1.py") + f2 = find_package_file("test_file_2.py") + + # f1 should be found in the `~/.lsst` directory + assert "home/pytest/.lsst" in str(f1) + + # f2 should be found in the `~/.config/lsst` directory + assert "home/pytest/.config/lsst" in str(f2) + + # f3 should not be found at all + with pytest.raises(IndexError): + _ = find_package_file("test_file_3.py")