Skip to content

Commit

Permalink
Merge pull request #9311 from amaltaro/fix-9221
Browse files Browse the repository at this point in the history
Don't share the same DBS object amongst multiple threads
  • Loading branch information
amaltaro authored Jul 30, 2019
2 parents 7b41b54 + 943ac2f commit 9b3a3f0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 36 deletions.
52 changes: 25 additions & 27 deletions src/python/WMCore/WorkQueue/DataLocationMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# PY3
from urllib.parse import urlparse

from WMCore.WorkQueue.WorkQueueUtils import get_dbs
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.WorkQueue.DataStructs.ACDCBlock import ACDCBlock

# TODO: Combine with existing dls so DBSreader can do this kind of thing transparently
Expand All @@ -22,29 +22,19 @@


def isGlobalDBS(dbs):
"""Is this the global dbs"""
"""
Receives a DBSReader object and finds out whether it's
pointing to Global DBS (no matter whether it's production
or the pre-production instance).
"""
try:
# try to determine from name - save a trip to server
# fragile but if this url changes many other things will break also...
url = urlparse(dbs.dbs.getServerUrl()) # DBSApi has url not DBSReader
if url.hostname.startswith('cmsweb.cern.ch') and url.path.startswith('/dbs/prod/global'):
return True
info = dbs.dbs.getServerInfo()
if info and info.get('InstanceName') == 'GLOBAL':
return True
return False
except Exception:
# determin whether this is dbs3
dbs.dbs.serverinfo()

# hacky way to check whether it is global or local dbs.
# issue is created, when it is resolved. use serverinfo() for that.
# https://github.com/dmwm/DBS/issues/355
url = dbs.dbs.url
if url.find("/global") != -1:
return True
else:
return False
url = urlparse(dbs.dbsURL)
if url.hostname.startswith('cmsweb'):
if url.path.startswith('/dbs/prod/global') or url.path.startswith('/dbs/int/global'):
return True
except Exception as ex:
logging.error("Failed to find out whether DBS is Global or not. Error: %s", str(ex))
return False


def timeFloor(number, interval=UPDATE_INTERVAL_COARSENESS):
Expand Down Expand Up @@ -85,6 +75,10 @@ def __init__(self, **kwargs):
if self.params.get('cric'):
self.cric = self.params['cric']

# save each DBSReader instance in the class object, such that
# the same object is not shared amongst multiple threads
self.dbses = {}

def __call__(self, dataItems, fullResync=False, dbses=None, datasetSearch=False):
result = {}

Expand Down Expand Up @@ -179,7 +173,10 @@ def organiseByDbs(self, dataItems):
# if it is acdc block don't update location. location should be
# inserted when block is queued and not supposed to change
continue
itemsByDbs[get_dbs(item['dbs_url'])].append(item['name'])

if item['dbs_url'] not in self.dbses:
self.dbses[item['dbs_url']] = DBSReader(item['dbs_url'])
itemsByDbs[self.dbses[item['dbs_url']]].append(item['name'])
return itemsByDbs


Expand All @@ -189,16 +186,17 @@ class WorkQueueDataLocationMapper(DataLocationMapper):
def __init__(self, logger, backend, **kwargs):
self.backend = backend
self.logger = logger
DataLocationMapper.__init__(self, **kwargs)
super(WorkQueueDataLocationMapper, self).__init__(**kwargs)
DataLocationMapper.__init__(self, )

def __call__(self, fullResync=False):
dataItems = self.backend.getActiveData()

# fullResync incorrect with multiple dbs's - fix!!!
dataLocations, fullResync = DataLocationMapper.__call__(self, dataItems, fullResync)
dataLocations, fullResync = super(WorkQueueDataLocationMapper, self).__call__(dataItems, fullResync)

# elements with multiple changed data items will fail fix this, or move to store data outside element
for _dbs, dataMapping in dataLocations.items():
for _, dataMapping in dataLocations.items():
modified = []
for data, locations in dataMapping.items():
elements = self.backend.getElementsForData(data)
Expand Down
29 changes: 20 additions & 9 deletions src/python/WMCore/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from WMCore.Database.CMSCouch import CouchInternalServerError, CouchNotFoundError
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.LogDB.LogDB import LogDB
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.PhEDEx.PhEDEx import PhEDEx
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr
from WMCore.Services.RequestDB.RequestDBReader import RequestDBReader
Expand All @@ -36,7 +37,7 @@
from WMCore.WorkQueue.WorkQueueBase import WorkQueueBase
from WMCore.WorkQueue.WorkQueueExceptions import (TERMINAL_EXCEPTIONS, WorkQueueError, WorkQueueNoMatchingElements,
WorkQueueWMSpecError)
from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames, get_dbs
from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames


# Convenience constructor functions
Expand All @@ -46,10 +47,6 @@ def globalQueue(logger=None, dbi=None, **kwargs):
"""
defaults = {'PopulateFilesets': False,
'LocalQueueFlag': False,
'SplittingMapping': {'DatasetBlock':
{'name': 'Block',
'args': {}}
},
'TrackLocationOrSubscription': 'location'
}
defaults.update(kwargs)
Expand Down Expand Up @@ -112,8 +109,10 @@ def __init__(self, logger=None, dbi=None, **params):
raise WorkQueueError(msg)
self.params['ParentQueueCouchUrl'] = self.parent_queue.queueUrl

self.params.setdefault("GlobalDBS",
"https://cmsweb.cern.ch/dbs/prod/global/DBSReader")
# save each DBSReader instance in the class object, such that
# the same object is not shared amongst multiple threads
self.dbses = {}

self.params.setdefault('QueueDepth', 1) # when less than this locally
self.params.setdefault('WorkPerCycle', 100)
self.params.setdefault('LocationRefreshInterval', 600)
Expand Down Expand Up @@ -368,10 +367,22 @@ def getWork(self, jobSlots, siteJobCounts, excludeWorkflows=None):
self.logger.info('Injected %s out of %s units into WMBS', len(results), len(matches))
return results

def _getDbs(self, dbsUrl):
"""
If we have already construct a DBSReader object pointing to
the DBS URL provided, return it. Otherwise, create and return
a new instance.
:param dbsUrl: string with the DBS url
:return: an instance of DBSReader
"""
if dbsUrl in self.dbses:
return self.dbses[dbsUrl]
return DBSReader(dbsUrl)

def _getDBSDataset(self, match):
"""Get DBS info for this dataset"""
tmpDsetDict = {}
dbs = get_dbs(match['Dbs'])
dbs = self._getDbs(match['Dbs'])
datasetName = match['Inputs'].keys()[0]

blocks = dbs.listFileBlocks(datasetName, onlyClosedBlocks=True)
Expand Down Expand Up @@ -403,7 +414,7 @@ def _getDBSBlock(self, match, wmspec):
block["Files"] = fileLists
return blockName, block
else:
dbs = get_dbs(match['Dbs'])
dbs = self._getDbs(match['Dbs'])
if wmspec.getTask(match['TaskName']).parentProcessingFlag():
dbsBlockDict = dbs.getFileBlockWithParents(blockName)
elif wmspec.getRequestType() == 'StoreResults':
Expand Down

0 comments on commit 9b3a3f0

Please sign in to comment.