Skip to content

Commit

Permalink
Merge pull request #11219 from amaltaro/fix-11163
Browse files Browse the repository at this point in the history
Use Dataset workqueue start policy for workflows processing MINIAODSIM data
  • Loading branch information
amaltaro authored Aug 3, 2022
2 parents 1b8d18e + c0d5ce4 commit a58a84c
Show file tree
Hide file tree
Showing 19 changed files with 445,439 additions and 201,128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def getInputData(self):
Returns the primary dataset name to be locked and
transferred with Rucio, instead of a list of blocks.
:return: a list of unique block names and an integer
:return: a list with the input dataset name and an integer
with their total size
"""
inputContainer = [self.getInputDataset()]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Workflow object representing how MiniAODSIM to NanoAODSIM workflows
have to be dealt with in terms of input data placement (MSTransferor)
"""

from WMCore.MicroService.MSTransferor.DataStructs.Workflow import Workflow
from WMCore.Services.Rucio.RucioUtils import GROUPING_ALL, NUM_COPIES_NANO


class NanoWorkflow(Workflow):
"""
Class to represent a very short workflows processing Mini and creating
Nano data, in the context of input data placement in MSTransferor
"""

def getInputData(self):
"""
Returns the primary dataset name to be locked and
transferred with Rucio, instead of a list of blocks.
:return: a list with the input dataset name and an integer
with their total size
"""
inputContainer = [self.getInputDataset()]
totalBlockSize = sum([blockInfo['blockSize'] for blockInfo in self.getPrimaryBlocks().values()])

# the whole container must be locked
return inputContainer, totalBlockSize

def getRucioGrouping(self):
"""
Returns the rucio rule grouping for growing workflows.
Input blocks can be scattered all over the provided
RSE expression.
:return: a string with the required DID grouping
"""
return GROUPING_ALL

def getReplicaCopies(self):
"""
Returns the number of replica copies to be defined in
a given rucio rule. Standard/default value is 1.
:return: an integer with the number of copies
"""
return NUM_COPIES_NANO
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from copy import copy, deepcopy
from WMCore.DataStructs.LumiList import LumiList
from WMCore.MicroService.Tools.Common import getMSLogger
from WMCore.Services.Rucio.RucioUtils import GROUPING_DSET, GROUPING_ALL
from WMCore.Services.Rucio.RucioUtils import GROUPING_DSET, GROUPING_ALL, NUM_COPIES_DEFAULT


class Workflow(object):
Expand Down Expand Up @@ -422,3 +422,12 @@ def getRucioGrouping(self):
if self.getParentDataset():
return GROUPING_ALL
return GROUPING_DSET

def getReplicaCopies(self):
"""
Returns the number of replica copies to be defined in
a given rucio rule. Standard/default value is 1.
:return: an integer with the number of copies
"""
return NUM_COPIES_DEFAULT
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def makeTransferRequest(self, wflow):
if dataIn["type"] == "primary":
dids, didsSize = wflow.getInputData()
grouping = wflow.getRucioGrouping()
copies = 1
copies = wflow.getReplicaCopies()
if not dids:
# no valid files in any blocks, it will likely fail in global workqueue
self.logger.warning(" found 0 primary/parent blocks for dataset: %s, moving on...", dataIn['name'])
Expand Down
21 changes: 21 additions & 0 deletions src/python/WMCore/MicroService/MSTransferor/RequestInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from WMCore.DataStructs.LumiList import LumiList
from WMCore.MicroService.MSTransferor.DataStructs.DQMHarvestWorkflow import DQMHarvestWorkflow
from WMCore.MicroService.MSTransferor.DataStructs.GrowingWorkflow import GrowingWorkflow
from WMCore.MicroService.MSTransferor.DataStructs.NanoWorkflow import NanoWorkflow
from WMCore.MicroService.MSTransferor.DataStructs.RelValWorkflow import RelValWorkflow
from WMCore.MicroService.MSTransferor.DataStructs.Workflow import Workflow
from WMCore.MicroService.Tools.PycurlRucio import (getRucioToken, getPileupContainerSizesRucio,
Expand All @@ -28,6 +29,24 @@
from WMCore.MicroService.MSCore import MSCore


def isNanoWorkflow(reqDict):
"""
Function to parse the request dictionary and decide whether it
corresponds to a MiniAODSIM to NanoAODSIM workflow.
:param reqDict: dictionary with the workflow description
:return: a boolean True if workflow is Nano, False otherwise.
"""
inputDset = ""
if reqDict['RequestType'] == "TaskChain":
inputDset = reqDict["Task1"].get("InputDataset", "")
elif reqDict['RequestType'] == "StepChain":
inputDset = reqDict["Step1"].get("InputDataset", "")

if inputDset.endswith("/MINIAODSIM"):
return True
return False


class RequestInfo(MSCore):
"""
RequestInfo class provides functionality to access and
Expand Down Expand Up @@ -87,6 +106,8 @@ def classifyWorkflows(self, reqRecords):
wflow = DQMHarvestWorkflow(record['RequestName'], record, logger=self.logger)
elif record.get("OpenRunningTimeout", 0) > self.openRunning:
wflow = GrowingWorkflow(record['RequestName'], record, logger=self.logger)
elif isNanoWorkflow(record):
wflow = NanoWorkflow(record['RequestName'], record, logger=self.logger)
else:
wflow = Workflow(record['RequestName'], record, logger=self.logger)

Expand Down
3 changes: 3 additions & 0 deletions src/python/WMCore/Services/Rucio/RucioUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# https://github.com/rucio/rucio/blob/master/lib/rucio/common/schema/cms.py#L117
GROUPING_DSET = "DATASET"
GROUPING_ALL = "ALL"
# number of copies to be defined when creating replication rules
NUM_COPIES_DEFAULT = 1
NUM_COPIES_NANO = 2


def validateMetaData(did, metaDict, logger):
Expand Down
30 changes: 21 additions & 9 deletions src/python/WMCore/WMSpec/StdSpecs/StepChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
checkMemCore, checkEventStreams, checkTimePerEvent)
from WMCore.WMSpec.WMSpecErrors import WMSpecFactoryException

# simple utils for data mining the request dictionary
isGenerator = lambda args: not args["Step1"].get("InputDataset", None)


class StepChainWorkloadFactory(StdBase):
"""
Expand Down Expand Up @@ -92,17 +89,16 @@ def __call__(self, workloadName, arguments):
self.createStepMappings(arguments)

# Create a proper task and set workload level arguments
if isGenerator(arguments):
startPolicy = self.decideWorkQueueStartPolicy(arguments)
self.workload.setWorkQueueSplitPolicy(startPolicy, taskConf['SplittingAlgo'],
taskConf['SplittingArguments'],
OpenRunningTimeout=self.openRunningTimeout)
if startPolicy == "MonteCarlo":
self.workload.setDashboardActivity("production")
self.workload.setWorkQueueSplitPolicy("MonteCarlo", taskConf['SplittingAlgo'],
taskConf['SplittingArguments'])
self.workload.setEndPolicy("SingleShot")
self.setupGeneratorTask(firstTask, taskConf)
else:
self.workload.setDashboardActivity("processing")
self.workload.setWorkQueueSplitPolicy("Block", taskConf['SplittingAlgo'],
taskConf['SplittingArguments'],
OpenRunningTimeout=self.openRunningTimeout)
self.setupTask(firstTask, taskConf)

# Now modify this task to add the next steps
Expand Down Expand Up @@ -614,3 +610,19 @@ def validateStep(self, taskConf, taskArgumentDefinition):
self.raiseValidationException(str(ex))

return

def decideWorkQueueStartPolicy(self, reqDict):
"""
Given a request dictionary, decides which WorkQueue start
policy needs to be used in a given request.
:param reqDict: a dictionary with the creation request information
:return: a string with the start policy to be used.
"""
if not reqDict["Step1"].get("InputDataset"):
return "MonteCarlo"

inputDset = reqDict["Step1"]["InputDataset"]
if inputDset.endswith("/MINIAODSIM"):
return "Dataset"
else:
return "Block"
33 changes: 23 additions & 10 deletions src/python/WMCore/WMSpec/StdSpecs/TaskChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
#
# simple utils for data mining the request dictionary
#
isGenerator = lambda args: not args["Task1"].get("InputDataset", None)
parentTaskModule = lambda args: args.get("InputFromOutputModule", None)


Expand Down Expand Up @@ -265,20 +264,19 @@ def __call__(self, workloadName, arguments):

if i == 1:
# First task will either be generator or processing
self.workload.setDashboardActivity("relval")
if isGenerator(arguments):
startPolicy = self.decideWorkQueueStartPolicy(arguments)
self.workload.setWorkQueueSplitPolicy(startPolicy, taskConf['SplittingAlgo'],
taskConf['SplittingArguments'],
blowupFactor=blowupFactor,
OpenRunningTimeout=self.openRunningTimeout)
if startPolicy == "MonteCarlo":
# generate mc events
self.workload.setWorkQueueSplitPolicy("MonteCarlo", taskConf['SplittingAlgo'],
taskConf['SplittingArguments'],
blowupFactor=blowupFactor)
self.workload.setDashboardActivity("production")
self.workload.setEndPolicy("SingleShot")
self.setupGeneratorTask(task, taskConf)
else:
# process an existing dataset
self.workload.setWorkQueueSplitPolicy("Block", taskConf['SplittingAlgo'],
taskConf['SplittingArguments'],
blowupFactor=blowupFactor,
OpenRunningTimeout=self.openRunningTimeout)
self.workload.setDashboardActivity("processing")
self.setupTask(task, taskConf)
else:
# all subsequent tasks have to be processing tasks
Expand Down Expand Up @@ -762,3 +760,18 @@ def validateTask(self, taskConf, taskArgumentDefinition):
except Exception as ex:
self.raiseValidationException(str(ex))

def decideWorkQueueStartPolicy(self, reqDict):
"""
Given a request dictionary, decides which WorkQueue start
policy needs to be used in a given request.
:param reqDict: a dictionary with the creation request information
:return: a string with the start policy to be used.
"""
if not reqDict["Task1"].get("InputDataset"):
return "MonteCarlo"

inputDset = reqDict["Task1"]["InputDataset"]
if inputDset.endswith("/MINIAODSIM"):
return "Dataset"
else:
return "Block"
17 changes: 10 additions & 7 deletions src/python/WMCore/WMSpec/WMWorkload.py
Original file line number Diff line number Diff line change
Expand Up @@ -1214,13 +1214,16 @@ def setMergeParameters(self, minSize, maxSize, maxEvents,

def setWorkQueueSplitPolicy(self, policyName, splitAlgo, splitArgs, **kwargs):
"""
_setWorkQueueSplitPolicy_
Set the WorkQueue split policy.
policyName should be either 'DatasetBlock', 'Dataset', 'MonteCarlo' 'Block'
different policy could be added in the workqueue plug in.
Additionally general parameters can be specified, these are not mapped and passed directly to the startPolicyArgs,
also record the splitting algorithm in case the WorkQUeue policy needs it.
Sets the WorkQueue start policy.
:param policyName: string with the policy name. Supported values should match the
WMCore/WorkQueue/Policy/Start module names (Dataset, Block, MonteCarlo or ResubmitBlock)
:param splitAlgo: string with the job splitting algorithm name. Supported values should
match the WMCore/JobSplitting module names.
:param splitArgs: dictionary with job splitting arguments
:param kwargs: dictionary with additional arguments that can be passed directly to
the startPolicyArgs
:return: None
"""
SplitAlgoToStartPolicy = {"FileBased": ["NumberOfFiles"],
"EventBased": ["NumberOfEvents",
Expand Down
Loading

0 comments on commit a58a84c

Please sign in to comment.