Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1924 | Add 'assetsCountToPropagate' and 'assetsCountPropagated' in task vertex. #4032

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ on:
- master
- lineageondemand
- makerlogic
- tagpropv1master

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do once all review is complete and we are ready to move to master


jobs:
build:
Expand Down Expand Up @@ -67,7 +68,7 @@ jobs:
- name: Build with Maven
run: |
branch_name=${{ env.BRANCH_NAME }}
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]]
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'tagpropv1master' ]]
then
echo "build without dashboard"
chmod +x ./build.sh && ./build.sh build_without_dashboard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public final class Constants {
public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId");
public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid");
public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName");
public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate");
public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated");
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class AtlasPatch implements Serializable {
private long createdTime;
private long updatedTime;
private PatchStatus status;
private long assetsCountToPropagate;
private long assetsCountPropagated;

public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }

Expand Down Expand Up @@ -157,7 +159,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status);
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated);
}

@Override
Expand All @@ -173,6 +175,8 @@ public String toString() {
sb.append(", createdTime=").append(createdTime);
sb.append(", updatedTime=").append(updatedTime);
sb.append(", status=").append(status);
sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate);
sb.append(", assetsCountPropagated=").append(assetsCountPropagated);
sb.append('}');

return sb.toString();
Expand Down
22 changes: 22 additions & 0 deletions intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public static Status from(String s) {
private String classificationId;
private String entityGuid;
private String classificationTypeName;
private Long assetsCountToPropagate;
private Long assetsCountPropagated;

public AtlasTask() {
}
Expand All @@ -111,6 +113,8 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
this.attemptCount = 0;
this.classificationId = classificationId;
this.entityGuid = entityGuid;
this.assetsCountToPropagate = 0L;
this.assetsCountPropagated = 0L;
}

public String getGuid() {
Expand Down Expand Up @@ -239,6 +243,22 @@ public String getEntityGuid() {
return entityGuid;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public void setAssetsCountPropagated(Long assetsCountPropagated) {
this.assetsCountPropagated = assetsCountPropagated;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@JsonIgnore
public void start() {
this.setStatus(Status.IN_PROGRESS);
Expand Down Expand Up @@ -270,6 +290,8 @@ public String toString() {
", attemptCount=" + attemptCount +
", errorMessage='" + errorMessage + '\'' +
", status=" + status +
", assetsCountToPropagate=" + assetsCountToPropagate +
", assetsCountPropagated=" + assetsCountPropagated +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false);

createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
import org.slf4j.LoggerFactory;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;


import java.util.*;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
Expand Down Expand Up @@ -96,7 +96,7 @@ public abstract class DeleteHandlerV1 {
private final TaskManagement taskManagement;
private final AtlasGraph graph;
private final TaskUtil taskUtil;

private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt();

public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
this.typeRegistry = typeRegistry;
Expand Down Expand Up @@ -1220,16 +1220,32 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship
}
}

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, addPropagationsMap.size() + removePropagationsMap.size());

int propagatedCount = 0;
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);

addTagPropagation(classificationVertex, entitiesToAddPropagation);
propagatedCount++;
if (propagatedCount == CHUNK_SIZE){
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount - 1);
propagatedCount = 0;
}
}

for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);

removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
propagatedCount++;
if (propagatedCount == CHUNK_SIZE){
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount);
propagatedCount = 0;
}
}
if (propagatedCount != 0){
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount);
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.apache.atlas.*;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
Expand Down Expand Up @@ -78,6 +79,7 @@

import javax.inject.Inject;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -94,6 +96,7 @@
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS;
import static org.apache.atlas.model.tasks.AtlasTask.Status.PENDING;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
Expand Down Expand Up @@ -3136,9 +3139,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
long classificationEdgeCount = 0;
long classificationEdgeInMemoryCount = 0;
Iterator<AtlasVertex> tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE);

List<AtlasVertex> tagVerticesProcessed = new ArrayList<>(0);
List<AtlasVertex> currentAssetVerticesBatch = new ArrayList<>(0);

int totalCount = 0;
hr2904 marked this conversation as resolved.
Show resolved Hide resolved
while (tagVertices != null && tagVertices.hasNext()) {
if (cleanedUpCount >= CLEANUP_MAX){
return;
Expand All @@ -3157,6 +3161,8 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
}

int currentAssetsBatchSize = currentAssetVerticesBatch.size();
totalCount += currentAssetsBatchSize;

if (currentAssetsBatchSize > 0) {
LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize);
int offset = 0;
Expand Down Expand Up @@ -3188,17 +3194,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
classificationEdgeInMemoryCount = 0;
}
}

try {
AtlasEntity entity = repairClassificationMappings(vertex);
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);
} catch (IllegalStateException | AtlasBaseException e) {
e.printStackTrace();
}

}

transactionInterceptHelper.intercept();

offset += CHUNK_SIZE;


} finally {
LOG.info("For offset {} , classificationEdge were : {}", offset, classificationEdgeCount);
classificationEdgeCount = 0;
Expand All @@ -3214,7 +3223,6 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
e.printStackTrace();
}
}
transactionInterceptHelper.intercept();

cleanedUpCount += currentAssetsBatchSize;
currentAssetVerticesBatch.clear();
Expand All @@ -3223,6 +3231,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE);
}

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, totalCount);
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, totalCount);

transactionInterceptHelper.intercept();
LOG.info("Completed cleaning up classification {}", classificationName);
}

Expand Down Expand Up @@ -3416,6 +3428,7 @@ public void addClassifications(final EntityMutationContext context, String guid,

public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid, Boolean previousRestrictPropagationThroughLineage,Boolean previousRestrictPropagationThroughHierarchy) throws AtlasBaseException {
try {

if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);

Expand Down Expand Up @@ -3459,9 +3472,10 @@ public List<String> propagateClassification(String entityGuid, String classifica
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false;
List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude);

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, impactedVertices.size() - 1);

if (CollectionUtils.isEmpty(impactedVertices)) {
LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);

return null;
}

Expand Down Expand Up @@ -3504,9 +3518,15 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v

propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids);

transactionInterceptHelper.intercept();

int propagatedAssetsCount = (offset + CHUNK_SIZE >= impactedVerticesSize && impactedVerticesSize == verticesToPropagate.size())
? toIndex - offset - 1 // Subtract 1 for the last chunk
: toIndex - offset;

offset += CHUNK_SIZE;

transactionInterceptHelper.intercept();
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount);

} while (offset < impactedVerticesSize);
} catch (AtlasBaseException exception) {
Expand All @@ -3516,7 +3536,7 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v
RequestContext.get().endMetricRecord(classificationPropagationMetricRecorder);
}

return propagatedEntitiesGuids;
return propagatedEntitiesGuids;

}

Expand Down Expand Up @@ -4050,6 +4070,9 @@ public void updateClassificationTextPropagation(String classificationVertexId) t
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
LOG.info("Fetched classification : {} ", classification.toString());
List<AtlasVertex> impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, impactedVertices.size());

LOG.info("impactedVertices : {}", impactedVertices.size());
int batchSize = 100;
for (int i = 0; i < impactedVertices.size(); i += batchSize) {
Expand All @@ -4064,6 +4087,9 @@ public void updateClassificationTextPropagation(String classificationVertexId) t
entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification));
}
}

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, end);

transactionInterceptHelper.intercept();
LOG.info("Updated classificationText from {} for {}", i, batchSize);
}
Expand Down Expand Up @@ -4254,6 +4280,8 @@ public void classificationRefreshPropagation(String classificationId) throws Atl
.filter(vertex -> vertex != null)
.collect(Collectors.toList());

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, verticesToRemove.size() + verticesToAddClassification.size());

//Remove classifications from unreachable vertices
processPropagatedClassificationDeletionFromVertices(verticesToRemove, currentClassificationVertex, classification);

Expand Down Expand Up @@ -4331,8 +4359,9 @@ private void processPropagatedClassificationDeletionFromVertices(List<AtlasVerte
List<AtlasEntity> updatedEntities = updateClassificationText(classification, updatedVertices);
entityChangeNotifier.onClassificationsDeletedFromEntities(updatedEntities, Collections.singletonList(classification));

int propagatedAssetsCount = toIndex - offset;
offset += CHUNK_SIZE;

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount);
transactionInterceptHelper.intercept();

} while (offset < propagatedVerticesSize);
Expand All @@ -4350,6 +4379,8 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi
int toIndex;
int offset = 0;

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, propagatedEdgesSize);

do {
toIndex = ((offset + CHUNK_SIZE > propagatedEdgesSize) ? propagatedEdgesSize : (offset + CHUNK_SIZE));

Expand All @@ -4365,8 +4396,11 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi
deletedPropagationsGuid.addAll(propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList()));
}

int propagatedAssetsCount = toIndex - offset;

offset += CHUNK_SIZE;

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedAssetsCount);
transactionInterceptHelper.intercept();

} while (offset < propagatedEdgesSize);
Expand Down
jnkrmg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;

import static org.apache.atlas.model.tasks.AtlasTask.Status.*;

import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;

public abstract class ClassificationTask extends AbstractTask {
Expand Down Expand Up @@ -97,24 +98,18 @@ public AtlasTask.Status perform() throws AtlasBaseException {
}

RequestContext.get().setUser(userName, null);

try {
setStatus(IN_PROGRESS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need for this changes


run(params);

setStatus(COMPLETE);
} catch (AtlasBaseException e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);

setStatus(FAILED);

throw e;
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
graph.commit();
}

return getStatus();
}

Expand Down
Loading
Loading