Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1938] preserve x bit in manifest file based copy #3804

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -88,9 +93,14 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',", manifestPath.toString(), manifestReadFs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}

CopyManifest.CopyableUnitIterator manifests = null;
List<CopyEntity> copyEntities = Lists.newArrayList();
List<FileStatus> toDelete = Lists.newArrayList();
// map of paths and permissions sorted by depth of path, so that permissions can be set in order
Map<String, OwnerAndPermission> ancestorOwnerAndPermissions = new TreeMap<>(
(o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), o1.chars().filter(ch -> ch == '/').count()));

try {
long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.manifestReadFs, this.manifestPath);
Expand Down Expand Up @@ -118,6 +128,13 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
CopyableFile copyableFile = copyableFileBuilder.build();
copyableFile.setFsDatasets(srcFs, targetFs);
copyEntities.add(copyableFile);

Path fromPath = srcFs.getFileStatus(fileToCopy).isDirectory() ? fileToCopy : fileToCopy.getParent();

ancestorOwnerAndPermissions.putAll(
CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs, fromPath,
new Path(commonFilesParent), configuration));

if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
// todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
Expand All @@ -128,6 +145,12 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}

Properties props = new Properties();
props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep setPermissionCommitStep = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this step? if dir already exist on dst and user want to change the permission for that dir, it will appear in the manifest? If dir not exist, we should directly create parent dir to be the same as source dir?

Copy link
Contributor Author

@arjun4084346 arjun4084346 Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If dir does not exist, we create it, but we create it with execute bit set. I do not want to not do that, because execute bit may be necessary to do publish.
So, in this PR, I had to add a post publish step to set the original permission.
Honestly, the usecase for this work, unsetting execute bit, seems quite rare.

copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), setPermissionCommitStep, 1));

if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,19 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
prePublish.size(), postPublish.size()));

executeCommitSequence(prePublish);

if (hasCopyableFiles(datasetWorkUnitStates)) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
} else {
log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
}
executeCommitSequence(postPublish);

this.fs.delete(datasetWriterOutputPath, true);

long datasetOriginTimestamp = Long.MAX_VALUE;
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.<String>absent();
Optional<String> fileSetRoot = Optional.absent();

for (WorkUnitState wus : datasetWorkUnitStates) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
Expand Down Expand Up @@ -300,6 +300,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
}
}

// execute post publish commit steps after preserving file attributes, because some post publish step,
// e.g. SetPermissionCommitStep needs to set permissions
executeCommitSequence(postPublish);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do move path from tmp to dst in this step, I believe it should happen before the set file attribute. We should make sure set file attribute preserve the right permission

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving from tmp(writer.output.dir actually) to dst is publish step, right?


// if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
// something more readable
if (Long.MAX_VALUE == datasetOriginTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.data.management.dataset;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -28,22 +29,30 @@
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.io.Files;

import static org.mockito.Mockito.*;


public class ManifestBasedDatasetFinderTest {
private FileSystem localFs;
private File tmpDir;

public ManifestBasedDatasetFinderTest() throws IOException {
localFs = FileSystem.getLocal(new Configuration());
tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
}

@Test
Expand Down Expand Up @@ -81,7 +90,7 @@ public void testFindFiles() throws IOException, URISyntaxException {
Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath));
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
Expand All @@ -96,7 +105,8 @@ public void testFindFiles() throws IOException, URISyntaxException {
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 2);
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).getFileStatus(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath);
Expand Down
2 changes: 1 addition & 1 deletion gobblin-iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
transitive = false
}
testCompile('org.apache.hadoop:hadoop-common:2.6.0')
testImplementation(testFixtures(project(":gobblin-completeness")))
testCompile project(":gobblin-completeness").sourceSets.test.output
testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
testCompile externalDependency.testng
testCompile externalDependency.mockito
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public static Path relativizePath(Path fullPath, Path pathPrefix) {
* @return true if possibleAncestor is an ancestor of fullPath.
*/
public static boolean isAncestor(Path possibleAncestor, Path fullPath) {
if (fullPath == null) {
return false;
}
return !relativizePath(fullPath, possibleAncestor).equals(getPathWithoutSchemeAndAuthority(fullPath));
}

Expand Down
Loading