Skip to content

Commit

Permalink
Merge pull request #11445 from amaltaro/wmagent216-bundle1
Browse files Browse the repository at this point in the history
Bundle of WMAgent patches for 2.1.6.1
  • Loading branch information
amaltaro authored Jan 18, 2023
2 parents c316b1b + 45bee53 commit de0d184
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
9 changes: 2 additions & 7 deletions src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False):
if srvCode == 128:
# block already exist
logging.warning("Block %s already exists. Marking it as uploaded.", name)
results.put({'name': name, 'success': "uploaded"})
results.put({'name': name, 'success': "check"})
elif srvCode in [132, 133, 134, 135, 136, 137, 138, 139, 140]:
# racing conditions
logging.warning("Hit a transient data race condition injecting block %s, %s", name, msg)
Expand All @@ -114,7 +114,7 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False):
logging.error(msg)
results.put({'name': name, 'success': "error", 'error': msg})
except Exception as ex:
msg = f"Hit a general exception while inserting block {name. Error: {str(ex}}"
msg = f"Hit a general exception while inserting block {name}. Error: {str(ex)}"
logging.exception(msg)
results.put({'name': name, 'success': "error", 'error': msg})
return
Expand Down Expand Up @@ -766,11 +766,6 @@ def retrieveBlocks(self):
elif result["success"] == "check":
block = result["name"]
self.blocksToCheck.append(block)
else:
logging.error("Error found in multiprocess during process of block %s", result.get('name'))
logging.error(result['error'])
# Continue to the next block
# Block will remain in pending status until it is transferred

if loadedBlocks:
try:
Expand Down
57 changes: 31 additions & 26 deletions src/python/WMComponent/JobAccountant/AccountantWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def handleJob(self, jobID, fwkJobReport):
outputModules = set([])
for fwjrFile in fileList:
outputModules.add(fwjrFile['outputModule'] + fwjrFile['dataset'].get('dataTier', ''))

if set(outputMap) == outputModules:
pass
elif jobType == "LogCollect" and not outputMap and outputModules == {'LogCollect'}:
Expand All @@ -474,38 +475,42 @@ def handleJob(self, jobID, fwkJobReport):
elif jobType == "Express" and set(outputMap).difference(outputModules) == {'write_RAWRAW'}:
pass
else:
failJob = True
# any job that is not multi-step and Processing/Production must FAIL!
if jobType in ["Processing", "Production"]:
cmsRunSteps = 0
for step in fwkJobReport.listSteps():
if step.startswith("cmsRun"):
cmsRunSteps += 1
if cmsRunSteps > 1:
failJob = False

if failJob:
jobSuccess = False
logging.error("Job %d , list of expected outputModules does not match job report, failing job",
jobID)
logging.debug("Job %d , expected outputModules %s", jobID, sorted(outputMap.keys()))
logging.debug("Job %d , fwjr outputModules %s", jobID, sorted(outputModules))
fileList = fwkJobReport.getAllFilesFromStep(step='logArch1')
cmsRunSteps = sum([1 for step in fwkJobReport.listSteps() if step.startswith("cmsRun")])
if cmsRunSteps == 1:
jobSuccess = False
else:
msg = f"Job {jobID} accepted for multi-step CMSSW, even though "
msg += "the expected outputModules does not match content of the FWJR."
logging.warning(msg)
else:
logging.warning(
"Job %d , list of expected outputModules does not match job report, accepted for multi-step CMSSW job",
jobID)
# Make sure every file has a valid location
# see https://github.com/dmwm/WMCore/issues/9353
for fwjrFile in fileList:
# T0 has analysis file without any location, see:
# https://github.com/dmwm/WMCore/issues/9497
if not fwjrFile.get("locations") and fwjrFile.get("lfn", "").endswith(".root"):
logging.warning("The following file doesn't have any location: %s", fwjrFile)
jobSuccess = False
break

if jobSuccess is False:
sortedOutputMap = sorted(outputMap.keys())
errMsg = f"Job {jobID}, expected output modules: {sortedOutputMap}, "
errMsg += f"but has FWJR output modules: {sorted(outputModules)}"
logging.error(errMsg)
# override file list by the logArch1 output only
fileList = fwkJobReport.getAllFilesFromStep(step='logArch1')
else:
fileList = fwkJobReport.getAllFilesFromStep(step='logArch1')

# Make sure every file has a valid location
# see https://github.com/dmwm/WMCore/issues/9353
newList = []
for fwjrFile in fileList:
# T0 has analysis file without any location, see:
# https://github.com/dmwm/WMCore/issues/9497
if not fwjrFile.get("locations") and fwjrFile.get("lfn", "").endswith(".root"):
logging.warning("The following file does not have any location: %s", fwjrFile)
jobSuccess = False
else:
newList.append(fwjrFile)
# save the new list free of ill files (without any location)
fileList = newList

if jobSuccess:
logging.info("Job %d , handle successful job", jobID)
else:
Expand Down

0 comments on commit de0d184

Please sign in to comment.