-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[spark] Clean empty directory after removing orphan files #4824
base: master
Are you sure you want to change the base?
Conversation
@@ -137,6 +138,7 @@ case class SparkOrphanFilesClean( | |||
it => | |||
var deletedFilesCount = 0L | |||
var deletedFilesLenInBytes = 0L | |||
val involvedDirectories = new ArrayBuffer[String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be a Set to do deduplicating?
deletedFilesCount += 1 | ||
} | ||
logInfo( | ||
s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") | ||
Iterator.single((deletedFilesCount, deletedFilesLenInBytes)) | ||
Iterator.single((deletedFilesCount, deletedFilesLenInBytes, involvedDirectories)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distinct directories and clean them with multi concurrent?
@@ -91,6 +95,10 @@ public abstract class OrphanFilesClean implements Serializable { | |||
protected final int partitionKeysNum; | |||
protected final Path location; | |||
|
|||
private static final String THREAD_NAME = "ORPHAN-FILES-CLEAN-THREAD-POOL"; | |||
private static final ThreadPoolExecutor executorService = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not introduce another thread pool, for LocalOrphanFilesClean
, you should just use executor in it.
// clean empty directories | ||
val deletedPaths = | ||
deleted.flatMap { case (_, _, paths) => paths }.collect().map(new Path(_)).toSet | ||
cleanEmptyDirectory(deletedPaths.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use Spark distributed RDD or DataFrame to clean them.
|
||
randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, deletedPaths); | ||
|
||
for (int level = 0; level < partitionKeysNum; level++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the level
used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the empty bucket directories are cleaned, the parent partition directories should also cleaned if they are empty.
cc @Zouxxyy |
} | ||
.cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid caching here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dataset need to use twice, currently cannot be removed, do you have a better way?
paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
Outdated
Show resolved
Hide resolved
...aimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
Outdated
Show resolved
Hide resolved
...aimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
Outdated
Show resolved
Hide resolved
@Zouxxyy Very thanks for you review, has adjusted. You can help review again for your free. |
c19c457
to
28e50eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall implementation idea is already quite clear. In each task, clean empty dirs of the involved buckets set.
And remember to clean up any unused code, such as "ORPHAN-FILES-CLEAN-THREAD-POOL," etc.
return new CleanOrphanFilesResult( | ||
deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); | ||
} | ||
|
||
private void cleanEmptyDirectory(List<Path> deleteFiles) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Extract a common method that can be used by both core, spark and so on. Note: bucket and partitions logic can be processed together, as long as we set level = partitionKeysNum + 1.
- Let the entire method be executed through randomlyOnlyExecute.
Purpose
Empty directories can not be cleaned after executing
remove_orphan_files
.Tests
API and Format
Documentation