Skip to content

Commit

Permalink
preserve x bit in manifest file based copy
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Oct 24, 2023
1 parent 2e3102f commit c22e67d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -88,9 +93,14 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',", manifestPath.toString(), manifestReadFs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}

CopyManifest.CopyableUnitIterator manifests = null;
List<CopyEntity> copyEntities = Lists.newArrayList();
List<FileStatus> toDelete = Lists.newArrayList();
// map of paths and permissions sorted by depth of path, so that permissions can be set in order
Map<String, OwnerAndPermission> ancestorOwnerAndPermissions = new TreeMap<>(
(o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), o1.chars().filter(ch -> ch == '/').count()));

try {
long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.manifestReadFs, this.manifestPath);
Expand Down Expand Up @@ -118,6 +128,11 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
CopyableFile copyableFile = copyableFileBuilder.build();
copyableFile.setFsDatasets(srcFs, targetFs);
copyEntities.add(copyableFile);

ancestorOwnerAndPermissions.putAll(
CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs, fileToCopy,
new Path(commonFilesParent), configuration));

if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
// todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
Expand All @@ -128,6 +143,12 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}

Properties props = new Properties();
props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep step = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), step, 1));

if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,19 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
prePublish.size(), postPublish.size()));

executeCommitSequence(prePublish);

if (hasCopyableFiles(datasetWorkUnitStates)) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
} else {
log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
}
executeCommitSequence(postPublish);

this.fs.delete(datasetWriterOutputPath, true);

long datasetOriginTimestamp = Long.MAX_VALUE;
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.<String>absent();
Optional<String> fileSetRoot = Optional.absent();

for (WorkUnitState wus : datasetWorkUnitStates) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
Expand Down Expand Up @@ -300,6 +300,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
}
}

// execute post publish commit steps after preserving file attributes, because some post publish step,
// e.g. SetPermissionCommitStep needs to set permissions
executeCommitSequence(postPublish);

// if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
// something more readable
if (Long.MAX_VALUE == datasetOriginTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public static Path relativizePath(Path fullPath, Path pathPrefix) {
* @return true if possibleAncestor is an ancestor of fullPath.
*/
public static boolean isAncestor(Path possibleAncestor, Path fullPath) {
if (fullPath == null) {
return false;
}
return !relativizePath(fullPath, possibleAncestor).equals(getPathWithoutSchemeAndAuthority(fullPath));
}

Expand Down

0 comments on commit c22e67d

Please sign in to comment.