-
Notifications
You must be signed in to change notification settings - Fork 108
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add extra protection for T0 to prevent archival of workflows having b…
…locks not yet deleted. Add GetDeletedBlocksByWorkflow DAO to WMBS. Aggregate all results in the DAO per workflowName Add extra protection for T0 at cleanCouchPoller Typo Update docstrings and log messages Remove redundant statements: Remove redundant range() start argument Remove redundant pass statement Remove redundant DISTINCT statement Typo Add CountUndeletedBlocksByWorkflow DAO && Decrease execution complexity in workflows with undeleted blocks check. Change log level to info. remove keynames remapping from GetDeletedBlocksByWorkflow DAO Pylint fixes. Review fixes
- Loading branch information
1 parent
f2aef9c
commit 71d225c
Showing
5 changed files
with
218 additions
and
3 deletions.
There are no files selected for viewing
49 changes: 49 additions & 0 deletions
49
src/python/WMComponent/DBS3Buffer/MySQL/CountUndeletedBlocksByWorkflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
""" | ||
_CountUndeletedBlocksByWorkflow_ | ||
MySQL implementation of Workflows.CountUndeletedBlocksByWorkflow | ||
Retrieves a list of workflows and the relative undeleted blocks counters, | ||
""" | ||
|
||
|
||
from WMCore.Database.DBFormatter import DBFormatter | ||
|
||
|
||
class CountUndeletedBlocksByWorkflow(DBFormatter): | ||
""" | ||
Retrieves a list of all workflows and the relative undeleted blocks counters | ||
The structure returned: | ||
[{'count': 6, | ||
'deleted': 0, | ||
'name': 'PromptReco_Run351572_HcalNZS_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'}, | ||
{'count': 8, | ||
'deleted': 0, | ||
'name': 'PromptReco_Run351572_NoBPTX_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'}, | ||
...] | ||
""" | ||
sql = """ | ||
SELECT | ||
dbsbuffer_workflow.name, | ||
COUNT(DISTINCT dbsbuffer_block.blockname) as count | ||
FROM dbsbuffer_block | ||
INNER JOIN dbsbuffer_file ON | ||
dbsbuffer_file.block_id = dbsbuffer_block.id | ||
INNER JOIN dbsbuffer_workflow ON | ||
dbsbuffer_workflow.id = dbsbuffer_file.workflow | ||
WHERE dbsbuffer_block.deleted=0 | ||
GROUP BY | ||
dbsbuffer_workflow.name | ||
""" | ||
|
||
def execute(self, conn=None, transaction=False, returnCursor=False): | ||
""" | ||
Executing the current sql query. | ||
:param conn: A current database connection to be used if existing | ||
:param transaction: A current database transaction to be used if existing | ||
:return: A list of dictionaries one record for each database line returned | ||
""" | ||
dictResults = DBFormatter.formatDict(self, self.dbi.processData(self.sql, conn=conn, | ||
transaction=transaction)) | ||
return dictResults |
14 changes: 14 additions & 0 deletions
14
src/python/WMComponent/DBS3Buffer/Oracle/CountUndeletedBlocksByWorkflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
#!/usr/bin/env python | ||
""" | ||
_CountUndeletedBlocksByWorkflow_ | ||
Oracle implementation of Workflow.CountUndeletedBlocksByWorkflow | ||
""" | ||
|
||
from WMComponent.DBS3Buffer.MySQL.CountUndeletedBlocksByWorkflow import CountUndeletedBlocksByWorkflow as MySQLCountUndeletedBlocksByWorkflow | ||
|
||
|
||
class CountUndeletedBlocksByWorkflow(MySQLCountUndeletedBlocksByWorkflow): | ||
""" | ||
Retrieves a list of all workflows and the relative deleted blocks counters | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
109 changes: 109 additions & 0 deletions
109
src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
""" | ||
_GetDeletedBlocksByWorkflow_ | ||
MySQL implementation of Workflows.GetDeletedBlocksByWorkflow | ||
Retrieves a list of workflows with lists of deleted and NOT deleted blocks per workflow, | ||
NOTE: This DAO is not used in the production code but is to be used only for debugging purposes | ||
""" | ||
|
||
|
||
from WMCore.Database.DBFormatter import DBFormatter | ||
|
||
|
||
class GetDeletedBlocksByWorkflow(DBFormatter): | ||
""" | ||
Retrieves a list of all workflows and the relative deleted blocks lists | ||
""" | ||
sql = """SELECT | ||
dbsbuffer_block.blockname, | ||
dbsbuffer_block.deleted, | ||
wmbs_workflow.name | ||
FROM dbsbuffer_block | ||
INNER JOIN dbsbuffer_file ON | ||
dbsbuffer_file.block_id = dbsbuffer_block.id | ||
INNER JOIN dbsbuffer_workflow ON | ||
dbsbuffer_workflow.id = dbsbuffer_file.workflow | ||
INNER JOIN wmbs_workflow ON | ||
wmbs_workflow.name = dbsbuffer_workflow.name | ||
GROUP BY | ||
dbsbuffer_block.blockname, | ||
dbsbuffer_block.deleted, | ||
wmbs_workflow.name | ||
""" | ||
|
||
def format(self, result): | ||
""" | ||
_format_ | ||
Format the query results into the proper dictionary expected at the upper layer Python code. | ||
The input should be a list of database objects, each one representing a single line returned | ||
from the database with key names matching the column names from the sql query. | ||
The intermediate (not aggregated) result representing the primary database output in python should be | ||
a list of dictionaries one record per line returned from the database with key names mapped to the | ||
python code variable naming conventions. | ||
e.g. | ||
[{'blockname': '/a/b/c#123-qwe', | ||
'deleted': 0, | ||
'name': 'WorkflowName'}, | ||
{'blockname': '/a/b/c#456-rty', | ||
'deleted': 1, | ||
'name': 'WorkflowName'}, | ||
{'blockname': '/a/b/d#123-asd', | ||
'deleted': 0, | ||
'name': 'WorkflowName'} | ||
... | ||
] | ||
NOTE: | ||
* The number of records per workflow and block returned (i.e. number of records per group in the GROUP BY statement) | ||
from the query is not related to either the number of blocks nor to the number of workflows, but rather to the | ||
combination of number of files in the block and some other factor which increases the granularity (it seems to be | ||
the number of records in dbsbuffer_workflow table per file aggregated by workflow), and NO `DISTINCT` requirement | ||
in the SELECT statement is needed because we already have them properly grouped. | ||
* Once deleted we should NOT expect duplicate records with two different values of the deleted | ||
flag to be returned for a single block but we should still create the list of deleted and | ||
NotDeleted blocks as sets and eventually create their proper intersection for double check. | ||
This list needs to be further aggregated by name to produce an aggregated structure per workflow like: | ||
[{'name': 'WorkflowName' | ||
'blocksNotDeleted': ['/a/b/c#123-qwe', | ||
/a/b/c#456-rty'] | ||
'blocksDeleted': ['/a/b/d#123-asd'] | ||
}, | ||
... | ||
] | ||
:param result: The result as returned by the mysql query execution. | ||
:return: List of dictionaries | ||
""" | ||
|
||
# First reformat the output in a list of dictionaries per DB record | ||
dictResults = DBFormatter.formatDict(self, result) | ||
|
||
# Now aggregate all blocks per workflow: | ||
results = {} | ||
for record in dictResults: | ||
wfName = record['name'] | ||
results.setdefault(wfName, {'name': wfName, 'blocksDeleted': [], 'blocksNotDeleted': []}) | ||
if record['deleted']: | ||
results[wfName]['blocksDeleted'].append(record['blockname']) | ||
else: | ||
results[wfName]['blocksNotDeleted'].append(record['blockname']) | ||
return results.values() | ||
|
||
def execute(self, conn=None, transaction=False, returnCursor=False): | ||
""" | ||
Executing the current sql query. | ||
:param conn: A current database connection to be used if existing | ||
:param transaction: A current database transaction to be used if existing | ||
:return: A list of dictionaries one record for each database line returned | ||
""" | ||
results = self.dbi.processData(self.sql, conn=conn, | ||
transaction=transaction) | ||
|
||
return self.format(results) |
16 changes: 16 additions & 0 deletions
16
src/python/WMCore/WMBS/Oracle/Workflow/GetDeletedBlocksByWorkflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#!/usr/bin/env python | ||
""" | ||
_GetDeletedBlocksByWorkflow_ | ||
Oracle implementation of Workflow.GetDeletedBlocksByWorkflow | ||
NOTE: This DAO is not used in the production code but is to be used only for debugging purposes | ||
""" | ||
|
||
from WMCore.WMBS.MySQL.Workflow.GetDeletedBlocksByWorkflow import GetDeletedBlocksByWorkflow as MySQLGetDeletedBlocksByWorkflow | ||
|
||
|
||
class GetDeletedBlocksByWorkflow(MySQLGetDeletedBlocksByWorkflow): | ||
""" | ||
Retrieves a list of all workflows and the relative deleted blocks lists | ||
""" |