Skip to content

Commit

Permalink
[core] Fix remove orphan files with data file path directory (#4871)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jan 9, 2025
1 parent 1f39542 commit 8ca41fc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
Expand Down Expand Up @@ -252,12 +253,14 @@ protected void collectWithoutDataFileWithManifestFlag(

/** List directories that contains data files and manifest files. */
protected List<Path> listPaimonFileDirs() {
FileStorePathFactory pathFactory = table.store().pathFactory();

List<Path> paimonFileDirs = new ArrayList<>();

paimonFileDirs.add(new Path(location, "manifest"));
paimonFileDirs.add(new Path(location, "index"));
paimonFileDirs.add(new Path(location, "statistics"));
paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum));
paimonFileDirs.add(pathFactory.manifestPath());
paimonFileDirs.add(pathFactory.indexPath());
paimonFileDirs.add(pathFactory.statisticsPath());
paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum));

return paimonFileDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
@ThreadSafe
public class FileStorePathFactory {

public static final String MANIFEST_PATH = "manifest";
public static final String MANIFEST_PREFIX = "manifest-";
public static final String MANIFEST_LIST_PREFIX = "manifest-list-";
public static final String INDEX_MANIFEST_PREFIX = "index-manifest-";

public static final String INDEX_PATH = "index";
public static final String INDEX_PREFIX = "index-";

public static final String STATISTICS_PATH = "statistics";
public static final String STATISTICS_PREFIX = "stat-";

public static final String BUCKET_PATH_PREFIX = "bucket-";

// this is the table schema root path
Expand Down Expand Up @@ -94,6 +105,25 @@ public Path root() {
return root;
}

public Path manifestPath() {
return new Path(root, MANIFEST_PATH);
}

public Path indexPath() {
return new Path(root, INDEX_PATH);
}

public Path statisticsPath() {
return new Path(root, STATISTICS_PATH);
}

public Path dataFilePath() {
if (dataFilePathDirectory != null) {
return new Path(root, dataFilePathDirectory);
}
return root;
}

@VisibleForTesting
public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue, boolean legacyPartitionName) {
Expand All @@ -103,25 +133,21 @@ public static InternalRowPartitionComputer getPartitionComputer(
}

public Path newManifestFile() {
return new Path(
root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
return toManifestFilePath(
MANIFEST_PREFIX + uuid + "-" + manifestFileCount.getAndIncrement());
}

public Path newManifestList() {
return new Path(
root
+ "/manifest/manifest-list-"
+ uuid
+ "-"
+ manifestListCount.getAndIncrement());
return toManifestListPath(
MANIFEST_LIST_PREFIX + uuid + "-" + manifestListCount.getAndIncrement());
}

public Path toManifestFilePath(String manifestFileName) {
return new Path(root + "/manifest/" + manifestFileName);
return new Path(manifestPath(), manifestFileName);
}

public Path toManifestListPath(String manifestListName) {
return new Path(root + "/manifest/" + manifestListName);
return new Path(manifestPath(), manifestListName);
}

public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
Expand Down Expand Up @@ -217,17 +243,13 @@ public PathFactory indexManifestFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root
+ "/manifest/index-manifest-"
+ uuid
+ "-"
+ indexManifestCount.getAndIncrement());
return toPath(
INDEX_MANIFEST_PREFIX + uuid + "-" + indexManifestCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/manifest/" + fileName);
return new Path(manifestPath(), fileName);
}
};
}
Expand All @@ -236,13 +258,12 @@ public PathFactory indexFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root + "/index/index-" + uuid + "-" + indexFileCount.getAndIncrement());
return toPath(INDEX_PREFIX + uuid + "-" + indexFileCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/index/" + fileName);
return new Path(indexPath(), fileName);
}
};
}
Expand All @@ -251,17 +272,12 @@ public PathFactory statsFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root
+ "/statistics/stats-"
+ uuid
+ "-"
+ statsFileCount.getAndIncrement());
return toPath(STATISTICS_PREFIX + uuid + "-" + statsFileCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/statistics/" + fileName);
return new Path(statisticsPath(), fileName);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,25 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil)
}

test("Paimon procedure: remove orphan files with data file path directory") {
sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id', 'data-file.path-directory'='data')
|""".stripMargin)

sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")

val table = loadTable("T")
val orphanFile = new Path(table.store().pathFactory().dataFilePath(), ORPHAN_FILE_1)
table.fileIO().tryToWriteAtomic(orphanFile, "b")

Thread.sleep(1000)
val older_than = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)
checkAnswer(
sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"),
Row(1, 1) :: Nil)
}
}

0 comments on commit 8ca41fc

Please sign in to comment.