From f36b90cc2a4b2ec83783552afdff7333c7055977 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Wed, 11 Jan 2023 16:51:39 -0500 Subject: [PATCH 1/3] Ensure JobAccountant does not insert files without any location apply Valentins suggestions one more Valentins suggestion Partially revert previous logic for multi-cmsRun jobs rename variable --- .../JobAccountant/AccountantWorker.py | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/python/WMComponent/JobAccountant/AccountantWorker.py b/src/python/WMComponent/JobAccountant/AccountantWorker.py index 493fb087d9..94d55c099f 100644 --- a/src/python/WMComponent/JobAccountant/AccountantWorker.py +++ b/src/python/WMComponent/JobAccountant/AccountantWorker.py @@ -461,6 +461,7 @@ def handleJob(self, jobID, fwkJobReport): outputModules = set([]) for fwjrFile in fileList: outputModules.add(fwjrFile['outputModule'] + fwjrFile['dataset'].get('dataTier', '')) + if set(outputMap) == outputModules: pass elif jobType == "LogCollect" and not outputMap and outputModules == {'LogCollect'}: @@ -474,38 +475,42 @@ def handleJob(self, jobID, fwkJobReport): elif jobType == "Express" and set(outputMap).difference(outputModules) == {'write_RAWRAW'}: pass else: - failJob = True + # any job that is not multi-step and Processing/Production must FAIL! if jobType in ["Processing", "Production"]: - cmsRunSteps = 0 - for step in fwkJobReport.listSteps(): - if step.startswith("cmsRun"): - cmsRunSteps += 1 - if cmsRunSteps > 1: - failJob = False - - if failJob: - jobSuccess = False - logging.error("Job %d , list of expected outputModules does not match job report, failing job", - jobID) - logging.debug("Job %d , expected outputModules %s", jobID, sorted(outputMap.keys())) - logging.debug("Job %d , fwjr outputModules %s", jobID, sorted(outputModules)) - fileList = fwkJobReport.getAllFilesFromStep(step='logArch1') + cmsRunSteps = sum([1 for step in fwkJobReport.listSteps() if step.startswith("cmsRun")]) + if cmsRunSteps == 1: + jobSuccess = False + else: + msg = f"Job {jobID} accepted for multi-step CMSSW, even though " + msg += "the expected outputModules does not match content of the FWJR." + logging.warning(msg) else: - logging.warning( - "Job %d , list of expected outputModules does not match job report, accepted for multi-step CMSSW job", - jobID) - # Make sure every file has a valid location - # see https://github.com/dmwm/WMCore/issues/9353 - for fwjrFile in fileList: - # T0 has analysis file without any location, see: - # https://github.com/dmwm/WMCore/issues/9497 - if not fwjrFile.get("locations") and fwjrFile.get("lfn", "").endswith(".root"): - logging.warning("The following file doesn't have any location: %s", fwjrFile) jobSuccess = False - break + + if jobSuccess is False: + sortedOutputMap = sorted(outputMap.keys()) + errMsg = f"Job {jobID}, expected output modules: {sortedOutputMap}, " + errMsg += f"but has FWJR output modules: {sorted(outputModules)}" + logging.error(errMsg) + # override file list by the logArch1 output only + fileList = fwkJobReport.getAllFilesFromStep(step='logArch1') else: fileList = fwkJobReport.getAllFilesFromStep(step='logArch1') + # Make sure every file has a valid location + # see https://github.com/dmwm/WMCore/issues/9353 + newList = [] + for fwjrFile in fileList: + # T0 has analysis file without any location, see: + # https://github.com/dmwm/WMCore/issues/9497 + if not fwjrFile.get("locations") and fwjrFile.get("lfn", "").endswith(".root"): + logging.warning("The following file does not have any location: %s", fwjrFile) + jobSuccess = False + else: + newList.append(fwjrFile) + # save the new list free of ill files (without any location) + fileList = newList + if jobSuccess: logging.info("Job %d , handle successful job", jobID) else: From a558fc3d172d03e98d1920954b0ce3701ccb36dd Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Wed, 11 Jan 2023 18:21:47 -0500 Subject: [PATCH 2/3] Fix broken syntax for the f-string in DBSUploadPoller --- src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py index 66cebddcdc..5b0e2a7646 100644 --- a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py +++ b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py @@ -114,7 +114,7 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False): logging.error(msg) results.put({'name': name, 'success': "error", 'error': msg}) except Exception as ex: - msg = f"Hit a general exception while inserting block {name. Error: {str(ex}}" + msg = f"Hit a general exception while inserting block {name}. Error: {str(ex)}" logging.exception(msg) results.put({'name': name, 'success': "error", 'error': msg}) return From 45bee536821ce566725e72ced5084ce36a003ee1 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Tue, 17 Jan 2023 14:07:53 -0500 Subject: [PATCH 3/3] Do not duplicate error messages in DBS3Upload --- src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py index 5b0e2a7646..65aac645b5 100644 --- a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py +++ b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py @@ -104,7 +104,7 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False): if srvCode == 128: # block already exist logging.warning("Block %s already exists. Marking it as uploaded.", name) - results.put({'name': name, 'success': "uploaded"}) + results.put({'name': name, 'success': "check"}) elif srvCode in [132, 133, 134, 135, 136, 137, 138, 139, 140]: # racing conditions logging.warning("Hit a transient data race condition injecting block %s, %s", name, msg) @@ -766,11 +766,6 @@ def retrieveBlocks(self): elif result["success"] == "check": block = result["name"] self.blocksToCheck.append(block) - else: - logging.error("Error found in multiprocess during process of block %s", result.get('name')) - logging.error(result['error']) - # Continue to the next block - # Block will remain in pending status until it is transferred if loadedBlocks: try: