diff --git a/python/lsst/ctrl/execute/allocator.py b/python/lsst/ctrl/execute/allocator.py index 8342d81..6f2fd06 100644 --- a/python/lsst/ctrl/execute/allocator.py +++ b/python/lsst/ctrl/execute/allocator.py @@ -260,6 +260,12 @@ def isVerbose(self): """ return self.opts.verbose + def isAuto(self): + """Status of the auto flag + @return True if the flag was set, False otherwise + """ + return self.opts.auto + def getUserName(self): """Accessor for USER_NAME @return the value of USER_NAME diff --git a/python/lsst/ctrl/execute/allocatorParser.py b/python/lsst/ctrl/execute/allocatorParser.py index 94f7acc..1c496d7 100644 --- a/python/lsst/ctrl/execute/allocatorParser.py +++ b/python/lsst/ctrl/execute/allocatorParser.py @@ -61,6 +61,12 @@ def parseArgs(self, basename): parser = argparse.ArgumentParser(prog=basename) parser.add_argument("platform", help="node allocation platform") + parser.add_argument( + "--auto", + action="store_true", + dest="auto", + help="use automatic detection of jobs to determine glide-ins", + ) parser.add_argument( "-n", "--node-count", diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 06fa8f7..4ed8b5d 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -36,6 +36,7 @@ def submit(self, platform, platformPkgDir): self.loadSlurm(configName, platformPkgDir) verbose = self.isVerbose() + auto = self.isAuto() # create the fully-resolved scratch directory string scratchDirParam = self.getScratchDirectory() @@ -60,11 +61,9 @@ def submit(self, platform, platformPkgDir): ) self.createAllocationFile(allocationName) - nodes = self.getNodes() cpus = self.getCPUs() memoryPerCore = self.getMemoryPerCore() totalMemory = cpus * memoryPerCore - print("Targeting %s glidein(s) for the computing pool/set." % nodes) # run the sbatch command template = Template(self.getLocalScratchDirectory()) @@ -89,14 +88,26 @@ def submit(self, platform, platformPkgDir): print("The Slurm job name for the glidein jobs is %s " % jobname) print("The user home directory is %s " % self.getUserHome()) - batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"]) - result = subprocess.check_output(batcmd, shell=True) - strResult = result.decode("UTF-8") + if auto: + numberToAdd = self.glideinsFromJobPressure() + print("The number of glidein jobs to submit now is %s" % numberToAdd) + else: + nodes = self.getNodes() + # In this case 'nodes' is the Target. + print("Targeting %s glidein(s) for the computing pool/set." % nodes) + + batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"]) + print("The squeue command is: %s " % batcmd) + try: + result = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + strResult = result.decode("UTF-8") - print("Detected this number of preexisting glidein jobs: %s " % strResult) + print("Detected this number of preexisting glidein jobs: %s " % strResult) - numberToAdd = nodes - int(strResult) - print("The number of glidein jobs to submit now is %s" % numberToAdd) + numberToAdd = nodes - int(strResult) + print("The number of glidein jobs to submit now is %s" % numberToAdd) for glide in range(0, numberToAdd): print("Submitting glidein %s " % glide) @@ -162,3 +173,146 @@ def createAllocationFile(self, input): print("Wrote new Slurm job allocation bash script to %s" % outfile) os.chmod(outfile, 0o755) return outfile + + def glideinsFromJobPressure(self): + """Calculate the number of glideins needed from job pressure + + Returns + ------- + number : `str` + The number of glideins + """ + + import math + import socket + + import htcondor + from lsst.ctrl.bps.htcondor import condor_q + + verbose = self.isVerbose() + + maxNumberOfGlideins = self.getNodes() + coresPerGlidein = self.getCPUs() + ratioMemCore = self.getMemoryPerCore() + auser = self.getUserName() + + # initialize counters + totalCores = 0 + + try: + schedd_name = socket.getfqdn() + coll = htcondor.Collector() + schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) + scheddref = htcondor.Schedd(schedd_ad) + # projection contains the job classads to be returned. + # These include the cpu and memory profile of each job, + # in the form of RequestCpus and RequestMemory + projection = [ + "JobStatus", + "Owner", + "RequestCpus", + "JobUniverse", + "RequestMemory", + ] + owner = f'(Owner=="{auser}") ' + jstat = "&& (JobStatus==1) " + juniv = "&& (JobUniverse==5)" + # The constraint determines that the jobs to be returned belong to + # the current user (Owner) and are Idle vanilla universe jobs. + full_constraint = f"{owner}{jstat}{juniv}" + if verbose: + print(f"full_constraint {full_constraint}") + condorq_data = condor_q( + constraint=full_constraint, + schedds={schedd_name: scheddref}, + projection=projection, + ) + if len(condorq_data) > 0: + print("glideinsFromJobPressure: Fetched") + condorq_bps = condorq_data[schedd_name] + if verbose: + print(len(condorq_bps)) + print(condorq_bps) + # disassemble the dictionary of dictionaries + for jid in list(condorq_bps.keys()): + job = condorq_bps[jid] + thisCores = job["RequestCpus"] + thisMemory = job["RequestMemory"] + totalCores = totalCores + thisCores + if verbose: + print( + f"glideinsFromJobPressure: The key in the dictionary is {jid}" + ) + print(f"\tRequestCpus {thisCores}") + print(f"\tCurrent value of totalCores {totalCores}") + thisRatio = thisMemory / ratioMemCore + if thisRatio > thisCores: + if verbose: + print("\t\tNeed to Add More:") + print(f"\t\tRequestMemory is {thisMemory} ") + print(f"\t\tRatio to {ratioMemCore} MB is {thisRatio} ") + totalCores = totalCores + (thisRatio - thisCores) + if verbose: + print(f"\t\tCurrent value of totalCores {totalCores}") + + else: + print("Length Zero") + print(len(condorq_data)) + except Exception as exc: + raise type(exc)("Problem querying condor schedd for jobs") from None + + print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}") + numberOfGlideins = math.ceil(totalCores / coresPerGlidein) + print( + f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}" + ) + + # Check Slurm queue Running glideins + jobname = f"glide_{auser}" + existingGlideinsRunning = 0 + batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l" + print("The squeue command is: %s " % batcmd) + try: + resultR = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + existingGlideinsRunning = int(resultR.decode("UTF-8")) + + # Check Slurm queue Idle Glideins + existingGlideinsIdle = 0 + batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" + print("The squeue command is: %s " % batcmd) + try: + resultPD = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + existingGlideinsIdle = int(resultPD.decode("UTF-8")) + + print( + f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}" + ) + print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}") + numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle + + print( + f"glideinsFromJobPressure: Target # Glideins Max to Submit {numberOfGlideinsRed}" + ) + + maxIdleGlideins = maxNumberOfGlideins - existingGlideinsRunning + maxSubmitGlideins = maxIdleGlideins - existingGlideinsIdle + + print(f"glideinsFromJobPressure: maxNumberOfGlideins {maxNumberOfGlideins}") + print( + f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}" + ) + print(f"glideinsFromJobPressure: maxIdleGlideins {maxIdleGlideins}") + print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}") + print(f"glideinsFromJobPressure: maxSubmitGlideins {maxSubmitGlideins}") + + if numberOfGlideinsRed > maxSubmitGlideins: + numberOfGlideinsRed = maxSubmitGlideins + + print( + f"glideinsFromJobPressure: The number of Glideins to submit now is {numberOfGlideinsRed}" + ) + return numberOfGlideinsRed diff --git a/ups/ctrl_execute.table b/ups/ctrl_execute.table index a92d7e0..f854b52 100644 --- a/ups/ctrl_execute.table +++ b/ups/ctrl_execute.table @@ -1,5 +1,6 @@ setupRequired(pex_config) setupRequired(utils) +setupRequired(ctrl_bps_htcondor) envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python) envPrepend(PATH, ${PRODUCT_DIR}/bin)