Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't share the same DBS object amongst multiple threads #9311

Merged
merged 1 commit into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions src/python/WMCore/WorkQueue/DataLocationMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# PY3
from urllib.parse import urlparse

from WMCore.WorkQueue.WorkQueueUtils import get_dbs
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.WorkQueue.DataStructs.ACDCBlock import ACDCBlock

# TODO: Combine with existing dls so DBSreader can do this kind of thing transparently
Expand All @@ -22,29 +22,19 @@


def isGlobalDBS(dbs):
"""Is this the global dbs"""
"""
Receives a DBSReader object and finds out whether it's
pointing to Global DBS (no matter whether it's production
or the pre-production instance).
"""
try:
# try to determine from name - save a trip to server
# fragile but if this url changes many other things will break also...
url = urlparse(dbs.dbs.getServerUrl()) # DBSApi has url not DBSReader
if url.hostname.startswith('cmsweb.cern.ch') and url.path.startswith('/dbs/prod/global'):
return True
info = dbs.dbs.getServerInfo()
if info and info.get('InstanceName') == 'GLOBAL':
return True
return False
except Exception:
# determin whether this is dbs3
dbs.dbs.serverinfo()

# hacky way to check whether it is global or local dbs.
# issue is created, when it is resolved. use serverinfo() for that.
# https://github.com/dmwm/DBS/issues/355
url = dbs.dbs.url
if url.find("/global") != -1:
return True
else:
return False
url = urlparse(dbs.dbsURL)
if url.hostname.startswith('cmsweb'):
if url.path.startswith('/dbs/prod/global') or url.path.startswith('/dbs/int/global'):
return True
except Exception as ex:
logging.error("Failed to find out whether DBS is Global or not. Error: %s", str(ex))
return False


def timeFloor(number, interval=UPDATE_INTERVAL_COARSENESS):
Expand Down Expand Up @@ -85,6 +75,10 @@ def __init__(self, **kwargs):
if self.params.get('cric'):
self.cric = self.params['cric']

# save each DBSReader instance in the class object, such that
# the same object is not shared amongst multiple threads
self.dbses = {}

def __call__(self, dataItems, fullResync=False, dbses=None, datasetSearch=False):
result = {}

Expand Down Expand Up @@ -179,7 +173,10 @@ def organiseByDbs(self, dataItems):
# if it is acdc block don't update location. location should be
# inserted when block is queued and not supposed to change
continue
itemsByDbs[get_dbs(item['dbs_url'])].append(item['name'])

if item['dbs_url'] not in self.dbses:
self.dbses[item['dbs_url']] = DBSReader(item['dbs_url'])
itemsByDbs[self.dbses[item['dbs_url']]].append(item['name'])
return itemsByDbs


Expand All @@ -189,16 +186,17 @@ class WorkQueueDataLocationMapper(DataLocationMapper):
def __init__(self, logger, backend, **kwargs):
self.backend = backend
self.logger = logger
DataLocationMapper.__init__(self, **kwargs)
super(WorkQueueDataLocationMapper, self).__init__(**kwargs)
DataLocationMapper.__init__(self, )

def __call__(self, fullResync=False):
dataItems = self.backend.getActiveData()

# fullResync incorrect with multiple dbs's - fix!!!
dataLocations, fullResync = DataLocationMapper.__call__(self, dataItems, fullResync)
dataLocations, fullResync = super(WorkQueueDataLocationMapper, self).__call__(dataItems, fullResync)

# elements with multiple changed data items will fail fix this, or move to store data outside element
for _dbs, dataMapping in dataLocations.items():
for _, dataMapping in dataLocations.items():
modified = []
for data, locations in dataMapping.items():
elements = self.backend.getElementsForData(data)
Expand Down
29 changes: 20 additions & 9 deletions src/python/WMCore/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from WMCore.Database.CMSCouch import CouchInternalServerError, CouchNotFoundError
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.LogDB.LogDB import LogDB
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.PhEDEx.PhEDEx import PhEDEx
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr
from WMCore.Services.RequestDB.RequestDBReader import RequestDBReader
Expand All @@ -36,7 +37,7 @@
from WMCore.WorkQueue.WorkQueueBase import WorkQueueBase
from WMCore.WorkQueue.WorkQueueExceptions import (TERMINAL_EXCEPTIONS, WorkQueueError, WorkQueueNoMatchingElements,
WorkQueueWMSpecError)
from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames, get_dbs
from WMCore.WorkQueue.WorkQueueUtils import cmsSiteNames


# Convenience constructor functions
Expand All @@ -46,10 +47,6 @@ def globalQueue(logger=None, dbi=None, **kwargs):
"""
defaults = {'PopulateFilesets': False,
'LocalQueueFlag': False,
'SplittingMapping': {'DatasetBlock':
{'name': 'Block',
'args': {}}
},
'TrackLocationOrSubscription': 'location'
}
defaults.update(kwargs)
Expand Down Expand Up @@ -112,8 +109,10 @@ def __init__(self, logger=None, dbi=None, **params):
raise WorkQueueError(msg)
self.params['ParentQueueCouchUrl'] = self.parent_queue.queueUrl

self.params.setdefault("GlobalDBS",
"https://cmsweb.cern.ch/dbs/prod/global/DBSReader")
# save each DBSReader instance in the class object, such that
# the same object is not shared amongst multiple threads
self.dbses = {}

self.params.setdefault('QueueDepth', 1) # when less than this locally
self.params.setdefault('WorkPerCycle', 100)
self.params.setdefault('LocationRefreshInterval', 600)
Expand Down Expand Up @@ -368,10 +367,22 @@ def getWork(self, jobSlots, siteJobCounts, excludeWorkflows=None):
self.logger.info('Injected %s out of %s units into WMBS', len(results), len(matches))
return results

def _getDbs(self, dbsUrl):
"""
If we have already construct a DBSReader object pointing to
the DBS URL provided, return it. Otherwise, create and return
a new instance.
:param dbsUrl: string with the DBS url
:return: an instance of DBSReader
"""
if dbsUrl in self.dbses:
return self.dbses[dbsUrl]
return DBSReader(dbsUrl)

def _getDBSDataset(self, match):
"""Get DBS info for this dataset"""
tmpDsetDict = {}
dbs = get_dbs(match['Dbs'])
dbs = self._getDbs(match['Dbs'])
datasetName = match['Inputs'].keys()[0]

blocks = dbs.listFileBlocks(datasetName, onlyClosedBlocks=True)
Expand Down Expand Up @@ -403,7 +414,7 @@ def _getDBSBlock(self, match, wmspec):
block["Files"] = fileLists
return blockName, block
else:
dbs = get_dbs(match['Dbs'])
dbs = self._getDbs(match['Dbs'])
if wmspec.getTask(match['TaskName']).parentProcessingFlag():
dbsBlockDict = dbs.getFileBlockWithParents(blockName)
elif wmspec.getRequestType() == 'StoreResults':
Expand Down