diff --git a/src/python/WMCore/WorkQueue/DataLocationMapper.py b/src/python/WMCore/WorkQueue/DataLocationMapper.py index e08502bfe6..ee3d3f7a9b 100644 --- a/src/python/WMCore/WorkQueue/DataLocationMapper.py +++ b/src/python/WMCore/WorkQueue/DataLocationMapper.py @@ -10,7 +10,7 @@ # PY3 from urllib.parse import urlparse -from WMCore.WorkQueue.WorkQueueUtils import get_dbs +from WMCore.Services.DBS.DBSReader import DBSReader from WMCore.WorkQueue.DataStructs.ACDCBlock import ACDCBlock # TODO: Combine with existing dls so DBSreader can do this kind of thing transparently @@ -22,29 +22,19 @@ def isGlobalDBS(dbs): - """Is this the global dbs""" + """ + Receives a DBSReader object and finds out whether it's + pointing to Global DBS (no matter whether it's production + or the pre-production instance). + """ try: - # try to determine from name - save a trip to server - # fragile but if this url changes many other things will break also... - url = urlparse(dbs.dbs.getServerUrl()) # DBSApi has url not DBSReader - if url.hostname.startswith('cmsweb.cern.ch') and url.path.startswith('/dbs/prod/global'): - return True - info = dbs.dbs.getServerInfo() - if info and info.get('InstanceName') == 'GLOBAL': - return True - return False - except Exception: - # determin whether this is dbs3 - dbs.dbs.serverinfo() - - # hacky way to check whether it is global or local dbs. - # issue is created, when it is resolved. use serverinfo() for that. - # https://github.com/dmwm/DBS/issues/355 - url = dbs.dbs.url - if url.find("/global") != -1: - return True - else: - return False + url = urlparse(dbs.dbsURL) + if url.hostname.startswith('cmsweb'): + if url.path.startswith('/dbs/prod/global') or url.path.startswith('/dbs/int/global'): + return True + except Exception as ex: + logging.error("Failed to find out whether DBS is Global or not. Error: %s", str(ex)) + return False def timeFloor(number, interval=UPDATE_INTERVAL_COARSENESS): @@ -85,6 +75,10 @@ def __init__(self, **kwargs): if self.params.get('cric'): self.cric = self.params['cric'] + # save each DBSReader instance in the class object, such that + # the same object is not shared amongst multiple threads + self.dbses = {} + def __call__(self, dataItems, fullResync=False, dbses=None, datasetSearch=False): result = {} @@ -179,7 +173,10 @@ def organiseByDbs(self, dataItems): # if it is acdc block don't update location. location should be # inserted when block is queued and not supposed to change continue - itemsByDbs[get_dbs(item['dbs_url'])].append(item['name']) + + if item['dbs_url'] not in self.dbses: + self.dbses[item['dbs_url']] = DBSReader(item['dbs_url']) + itemsByDbs[self.dbses[item['dbs_url']]].append(item['name']) return itemsByDbs @@ -189,16 +186,17 @@ class WorkQueueDataLocationMapper(DataLocationMapper): def __init__(self, logger, backend, **kwargs): self.backend = backend self.logger = logger - DataLocationMapper.__init__(self, **kwargs) + super(WorkQueueDataLocationMapper, self).__init__(**kwargs) + DataLocationMapper.__init__(self, ) def __call__(self, fullResync=False): dataItems = self.backend.getActiveData() # fullResync incorrect with multiple dbs's - fix!!! - dataLocations, fullResync = DataLocationMapper.__call__(self, dataItems, fullResync) + dataLocations, fullResync = super(WorkQueueDataLocationMapper, self).__call__(dataItems, fullResync) # elements with multiple changed data items will fail fix this, or move to store data outside element - for _dbs, dataMapping in dataLocations.items(): + for _, dataMapping in dataLocations.items(): modified = [] for data, locations in dataMapping.items(): elements = self.backend.getElementsForData(data) diff --git a/src/python/WMCore/WorkQueue/WorkQueue.py b/src/python/WMCore/WorkQueue/WorkQueue.py index b159b2841c..b031596fac 100644 --- a/src/python/WMCore/WorkQueue/WorkQueue.py +++ b/src/python/WMCore/WorkQueue/WorkQueue.py @@ -22,6 +22,7 @@ from WMCore.Database.CMSCouch import CouchInternalServerError, CouchNotFoundError from WMCore.Services.CRIC.CRIC import CRIC from WMCore.Services.LogDB.LogDB import LogDB +from WMCore.Services.DBS.DBSReader import DBSReader from WMCore.Services.PhEDEx.PhEDEx import PhEDEx from WMCore.Services.ReqMgr.ReqMgr import ReqMgr from WMCore.Services.RequestDB.RequestDBReader import RequestDBReader @@ -36,7 +37,7 @@ from WMCore.WorkQueue.WorkQueueBase import WorkQueueBase from WMCore.WorkQueue.WorkQueueExceptions import (TERMINAL_EXCEPTIONS, WorkQueueError, WorkQueueNoMatchingElements, WorkQueueWMSpecError) -from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames, get_dbs +from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames # Convenience constructor functions @@ -46,10 +47,6 @@ def globalQueue(logger=None, dbi=None, **kwargs): """ defaults = {'PopulateFilesets': False, 'LocalQueueFlag': False, - 'SplittingMapping': {'DatasetBlock': - {'name': 'Block', - 'args': {}} - }, 'TrackLocationOrSubscription': 'location' } defaults.update(kwargs) @@ -112,8 +109,10 @@ def __init__(self, logger=None, dbi=None, **params): raise WorkQueueError(msg) self.params['ParentQueueCouchUrl'] = self.parent_queue.queueUrl - self.params.setdefault("GlobalDBS", - "https://cmsweb.cern.ch/dbs/prod/global/DBSReader") + # save each DBSReader instance in the class object, such that + # the same object is not shared amongst multiple threads + self.dbses = {} + self.params.setdefault('QueueDepth', 1) # when less than this locally self.params.setdefault('WorkPerCycle', 100) self.params.setdefault('LocationRefreshInterval', 600) @@ -368,10 +367,22 @@ def getWork(self, jobSlots, siteJobCounts, excludeWorkflows=None): self.logger.info('Injected %s out of %s units into WMBS', len(results), len(matches)) return results + def _getDbs(self, dbsUrl): + """ + If we have already construct a DBSReader object pointing to + the DBS URL provided, return it. Otherwise, create and return + a new instance. + :param dbsUrl: string with the DBS url + :return: an instance of DBSReader + """ + if dbsUrl in self.dbses: + return self.dbses[dbsUrl] + return DBSReader(dbsUrl) + def _getDBSDataset(self, match): """Get DBS info for this dataset""" tmpDsetDict = {} - dbs = get_dbs(match['Dbs']) + dbs = self._getDbs(match['Dbs']) datasetName = match['Inputs'].keys()[0] blocks = dbs.listFileBlocks(datasetName, onlyClosedBlocks=True) @@ -403,7 +414,7 @@ def _getDBSBlock(self, match, wmspec): block["Files"] = fileLists return blockName, block else: - dbs = get_dbs(match['Dbs']) + dbs = self._getDbs(match['Dbs']) if wmspec.getTask(match['TaskName']).parentProcessingFlag(): dbsBlockDict = dbs.getFileBlockWithParents(blockName) elif wmspec.getRequestType() == 'StoreResults':