Skip to content

Commit

Permalink
Refactor cleanRSE method; list content recursively
Browse files Browse the repository at this point in the history
filterUnmergedFiles method no longer exists

Fix check for isDeletable

Fix key name for dirsDeletedFail

check if ctx object exist before freeing it
  • Loading branch information
amaltaro committed Sep 8, 2024
1 parent 1a80ddf commit 00c3d65
Showing 1 changed file with 98 additions and 167 deletions.
265 changes: 98 additions & 167 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import re
import os
import errno
import stat
try:
import gfal2
except ImportError:
Expand Down Expand Up @@ -152,7 +151,6 @@ def __init__(self, msConfig, logger=None):
Functor(self.updateRSETimestamps, start=True, end=False),
Functor(self.consRecordAge),
Functor(self.getUnmergedFiles),
Functor(self.filterUnmergedFiles),
Functor(self.getPfn),
Functor(self.cleanRSE),
Functor(self.updateServiceCounters),
Expand Down Expand Up @@ -306,10 +304,23 @@ def _execute(self, rseList):
def cleanRSE(self, rse):
"""
The method to implement the actual deletion of files for an RSE.
Order of deletion attempts is:
1. top directory
2. list sub-directories and files
3. remove each file (unlink)
4. remove the (now) empty sub-directories
5. try to remove the top directory again
:param rse: MSUnmergedRSE object to be cleaned
:return: The MSUnmergedRSE object
"""
# reset dirs counters
rse['dirs']['deletedSuccess'] = set()
rse['dirs']['deletedFail'] = set()
self.logger.info("Start cleaning files for RSE: %s.", rse['name'])
if not rse['dirs']['toDelete']:
self.logger.info("There is nothing to delete for RSE: %s.", rse['name'])
rse['isClean'] = self._checkClean(rse)
return rse

# Create the gfal2 context object:
try:
Expand All @@ -320,120 +331,80 @@ def cleanRSE(self, rse):
self.logger.exception(msg)
raise MSUnmergedPlineExit(msg) from ex

filesToDeleteCurrRSE = 0

# Start cleaning one directory at a time:
for dirLfn, fileLfnGen in rse['files']['toDelete'].items():
if dirLfn in rse['dirs']['deletedSuccess']:
self.logger.info("RSE: %s, dir: %s already successfully deleted.", rse['name'], dirLfn)
continue

if self.msConfig['limitFilesPerRSE'] < 0 or \
filesToDeleteCurrRSE < self.msConfig['limitFilesPerRSE']:

# Now we consume the rse['files']['toDelete'][dirLfn] generator
# upon that no values will be left in it. In case we need it again
# we will have to recreate the filter as we did in self.filterUnmergedFiles()
pfnList = []
if not rse['pfnPrefix']:
# Fall back to calling Rucio on a per directory basis for
# resolving the lfn to pfn mapping
dirPfn = self.rucio.getPFN(rse['name'], dirLfn, operation='delete')[dirLfn]
for fileLfn in fileLfnGen:
fileLfnSuffix = fileLfn.split(dirLfn)[1]
filePfn = dirPfn + fileLfnSuffix
pfnList.append(filePfn)
for idx, dirLfn in enumerate(rse['dirs']['toDelete']):
self.logger.info("Processing directory index %s out of %s", idx, len(rse['dirs']['toDelete']))
# figure out the PFN prefix
dirPfn = rse['pfnPrefix'] + dirLfn
if not self.msConfig['enableRealMode']:
self.logger.info("DRY-RUN: would delete directory PFN: %s for RSE: %s", dirPfn, rse['name'])
else:
# The following two bool flags are to track the success for directory removal
# during all consecutive attempts/steps of cleaning the current branch.
filesDeletedSuccess = 0
filesDeletedFail = 0

# Initially try to delete the whole directory even before emptying its content:
self.logger.info("Trying to remove the whole directory: %s", dirPfn)
rmdirSuccess = self._rmDir(ctx, dirPfn)

if rmdirSuccess:
self.logger.info("Directory successfully removed: %s", dirPfn)
rse['counters']['dirsDeletedSuccess'] += 1
else:
# Proceed with assembling the full filePfn out of the rse['pfnPrefix'] and the fileLfn
dirPfn = rse['pfnPrefix'] + dirLfn
for fileLfn in fileLfnGen:
filePfn = rse['pfnPrefix'] + fileLfn
pfnList.append(filePfn)

filesToDeleteCurrRSE += len(pfnList)
msg = "\nRSE: %s \nDELETING: %s."
msg += "\nPFN list with: %s entries: \n%s"
self.logger.debug(msg, rse['name'], dirLfn, len(pfnList), twFormat(pfnList, maxLength=4))

if self.msConfig['enableRealMode']:
# The following two bool flags are to track the success for directory removal
# during all consecutive attempts/steps of cleaning the current branch.
rmdirSuccess = False
purgeSuccess = False
filesDeletedSuccess = 0
filesDeletedFail = 0

# Initially try to delete the whole directory even before emptying its content:
self.logger.info("Trying to remove nonempty directory: %s", dirLfn)
rmdirSuccess = self._rmDir(ctx, dirPfn)

# If the directory was considered successfully removed, update the file counters with the length of the directory contents
# If the above operation fails try to execute the directory contents deletion in bulk - full list of files per directory
if rmdirSuccess:
filesDeletedSuccess = len(pfnList)
else:
msg = "Trying to clean the contents of nonempty directory: %s "
msg += "in slices of: %s files"
self.logger.info(msg, dirLfn, self.msConfig["filesToDeleteSliceSize"])
for pfnSlice in list(grouper(pfnList, self.msConfig["filesToDeleteSliceSize"])):
try:
delResult = ctx.unlink(pfnSlice)
# Count all the successfully deleted files (if a deletion was
# successful a value of None is put in the delResult list):
self.logger.debug("RSE: %s, Dir: %s, delResult: %s",
rse['name'], dirLfn, pformat(delResult))
for gfalErr in delResult:
if gfalErr is None:
filesDeletedSuccess += 1
else:
filesDeletedFail += 1
errMessage = os.strerror(gfalErr.code)
rse['counters']['gfalErrors'].setdefault(errMessage, 0)
rse['counters']['gfalErrors'][errMessage] += 1
except Exception as ex:
msg = "Error while cleaning RSE: %s. "
msg += "Will retry in the next cycle. Err: %s"
self.logger.exception(msg, rse['name'], str(ex))

self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s",
rse['name'], dirLfn, filesDeletedSuccess)

# Now delete the whole branch, which was previously cleaned file by file
# First try to delete the base directory:
rmdirSuccess = self._rmDir(ctx, dirPfn)

# Then if unable to delete the base directory due to nonEmpty err or similar, try with _purgeTree() recursively
if not rmdirSuccess:
self.logger.info("Trying to recursively purge directory: %s:\n", dirLfn)
purgeSuccess = self._purgeTree(ctx, dirPfn)

# Updating the RSE counters with the newly successfully deleted files
rse['counters']['filesDeletedSuccess'] += filesDeletedSuccess
rse['counters']['filesDeletedFail'] += filesDeletedFail

if purgeSuccess or rmdirSuccess:
self.logger.info("Failed to remove the whole directory. Listing its content and deleting file by file.")
listFiles = self._listDir(ctx, dirPfn)
self.logger.info("Starting deletion of %s files:", len(listFiles))
for pfnSlice in list(grouper(listFiles, self.msConfig["filesToDeleteSliceSize"])):
self.logger.info("Executing file slice removal for %s files...", len(pfnSlice))
try:
# returns None if deletion was successful
for resp in ctx.unlink(pfnSlice):
if resp is None:
filesDeletedSuccess += 1
else:
filesDeletedFail += 1
errMessage = os.strerror(resp.code)
rse['counters']['gfalErrors'].setdefault(errMessage, 0)
rse['counters']['gfalErrors'][errMessage] += 1
except Exception as ex:
msg = "Error while cleaning RSE: %s. "
msg += "Will retry in the next cycle. Err: %s"
self.logger.exception(msg, rse['name'], str(ex))

self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s, filesDeletedFail: %s",
rse['name'], dirLfn, filesDeletedSuccess, filesDeletedFail)

# now reverse engineer the deepest directory names and delete each one of them
setDirPfn = set()
for item in listFiles:
setDirPfn.add(os.path.dirname(item))

for subDir in setDirPfn:
rmdirSuccess = self._rmDir(ctx, subDir)
if rmdirSuccess:
self.logger.info("Sub-directory successfully removed: %s", subDir)
else:
self.logger.info("Sub-directory failed to be removed: %s", subDir)

# lastly, try to delete the original directory
if self._rmDir(ctx, dirPfn):
self.logger.info("Finally, directory successfully removed: %s", dirPfn)
rse['counters']['dirsDeletedSuccess'] += 1
rse['dirs']['deletedSuccess'].add(dirLfn)
rse['counters']['dirsDeletedSuccess'] = len(rse['dirs']['deletedSuccess'])
# if dirLfn in rse['dirs']['toDelete']:
# rse['dirs']['toDelete'].remove(dirLfn)
if dirLfn in rse['dirs']['deletedFail']:
rse['dirs']['deletedFail'].remove(dirLfn)
msg = "RSE: %s Success deleting directory: %s"
self.logger.info(msg, rse['name'], dirLfn)
else:
self.logger.info("Directory still fails to be removed: %s", dirPfn)
rse['counters']['dirsDeletedFail'] += 1
rse['dirs']['deletedFail'].add(dirLfn)
rse['counters']['dirsDeletedFail'] = len(rse['dirs']['deletedFail'])
msg = "RSE: %s Failed to purge directory: %s"
self.logger.error(msg, rse['name'], dirLfn)
else:
msg = "RSE: %s reached limit of files per RSE to be deleted. Skipping directory: %s. It will be retried on the next cycle."
self.logger.warning(msg, rse['name'], dirLfn)

# Updating the RSE counters with the newly successfully deleted files
rse['counters']['filesDeletedSuccess'] += filesDeletedSuccess
rse['counters']['filesDeletedFail'] += filesDeletedFail
rse['isClean'] = self._checkClean(rse)

# Explicitly release all internal resources used by the gfal2 context instance
if ctx:
ctx.free()

return rse

def _rmDir(self, ctx, dirPfn):
Expand All @@ -457,71 +428,27 @@ def _rmDir(self, ctx, dirPfn):
rmdirSuccess = False
return rmdirSuccess


def _purgeTree(self, ctx, baseDirPfn, isDirEntry=False):
def _listDir(self, ctx, dirPfn):
"""
A method to be used for purging the tree bellow a specific branch.
It deletes every empty directory bellow that branch + the origin at the end.
:param ctx: The gfal2 context object
:param baseDirPfn: The base entry for starting the recursion
:param isDirEntry: Bool flag to avoid extra `stat` operations
NOTE: When called from inside a recursion, we have already checked if the entry point is a directory
:return: Bool: True if it managed to purge everything, False otherwise
"""
# NOTE: It deletes only directories and does not try to unlink any file.
Recursively lists all files in the given directory and its subdirectories.
# First, test if baseDirPfn is actually a directory entry:
if not isDirEntry:
try:
entryStat = ctx.stat(baseDirPfn)
if not stat.S_ISDIR(entryStat.st_mode):
self.logger.error("The base pfn: %s is not a directory entry.", baseDirPfn)
return False
except gfal2.GError as gfalExc:
if gfalExc.code == errno.ENOENT:
self.logger.warning("MISSING baseDir: %s", baseDirPfn)
return True
else:
self.logger.error("FAILED to open baseDir: %s: gfalException: %s, gfalErrorCode: %s", baseDirPfn, str(gfalExc), gfalExc.code)
return False

# Second, recursively iterate down the tree:
successList = []
:param ctx: Gfal context manager object
:param dirPfn: string with a directory pfn
"""
files = []
try:
dirEntryList = ctx.listdir(baseDirPfn)
except gfal2.GError as gfalExc:
if gfalExc.code == errno.ENOENT:
self.logger.warning("MISSING baseDir: %s", baseDirPfn)
return True
else:
self.logger.error("FAILED to list dirEntry: %s: gfalException: %s, gfalErrorCode: %s", baseDirPfn, str(gfalExc), gfalExc.code)
return False

for dirEntry in dirEntryList:
if dirEntry in ['.', '..']:
continue
dirEntryPfn = baseDirPfn + dirEntry
entryStat = None
try:
entryStat = ctx.stat(dirEntryPfn)
except gfal2.GError as gfalExc:
if gfalExc.code == errno.ENOENT:
self.logger.warning("MISSING dirEntry: %s", dirEntryPfn)
successList.append(True)
for entry in ctx.listdir(dirPfn):
# are there files inside this folder
if entry.endswith(".root"):
files.append(os.path.join(dirPfn, entry))
else:
self.logger.error("FAILED to open dirEntry: %s: gfalException: %s, gfalErrorCode: %s", dirEntryPfn, str(gfalExc), gfalExc.code)
successList.append(False)
continue

if entryStat and stat.S_ISDIR(entryStat.st_mode):
successList.append(self._purgeTree(ctx, dirEntryPfn, isDirEntry=True))

# Finally, remove the baseDir:
self.logger.debug("RM baseDir: %s", baseDirPfn)
success = self._rmDir(ctx, baseDirPfn)
successList.append(success)
# it is a directory. Go deeper another directory level
files.extend(self._listDir(ctx, os.path.join(dirPfn, entry)))
except gfal2.GError as gfalExc:
self.logger.warning("Failed to list directory: %s, gfal code: %s", dirPfn, gfalExc.code)

return all(successList)
self.logger.info("Entries under directory: %s is: %s", dirPfn, len(files))
return files

def _checkClean(self, rse):
"""
Expand Down Expand Up @@ -710,7 +637,7 @@ def _isDeletable(self, dirPath):
return False

# Finally, check against the protected LFNs
return dirPath in self.protectedLFNs
return dirPath not in self.protectedLFNs

def getPfn(self, rse):
"""
Expand All @@ -735,6 +662,10 @@ def getPfn(self, rse):
msg = "Could not establish the correct pfn Prefix for RSE: %s. " % rse['name']
msg += "Will fall back to calling Rucio on a directory basis for lfn to pfn resolution."
self.logger.warning(msg)
if not rse['pfnPrefix']:
msg = f"Failed to resolve PFN from LFN for RSE: {rse['name']}. Will retry later."
raise MSUnmergedPlineExit(msg)

return rse

# @profile
Expand Down

0 comments on commit 00c3d65

Please sign in to comment.