-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DG-1924 | Add 'assetsCountToPropagate' and 'assetsCountPropagated' in task vertex. #4032
base: master
Are you sure you want to change the base?
Conversation
bc0322a
to
cd6f2a3
Compare
...itory/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
Outdated
Show resolved
Hide resolved
e3485d8
to
a118278
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
.github/workflows/maven.yml
Outdated
@@ -28,6 +28,8 @@ on: | |||
- master | |||
- lineageondemand | |||
- makerlogic | |||
- taskdg1924deleteprop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
2ea2057
to
7ff6994
Compare
ed9f39a
to
398085d
Compare
intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
RequestContext.get().setCurrentTask(task); | ||
|
||
task.setStartTime(new Date()); | ||
|
||
task.setAssetsCountToPropagate(runnableTask.getAssetsCountToPropagate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to pass runnable task here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the task goes into progress the existing code updates the startTime and a few other details about the task. In the same scenario I needed to update value for assetsCountToPropagate
with the number of tasks and for that I need the runnable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure you have the real number here to set assetsCountToPropagate
?
IMO here you will always get it as 0, as per previous code I saw while creating Task vertex we already write value as 0, I feel this is not needed
//update the 'assetsCountToPropagate' in the current task vertex. | ||
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); | ||
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); | ||
graph.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use graph.commit()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to commit here given we will be committing to the graph just after 2 line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a new function updateTaskVertexProperty
that is used everywhere and it doesn't have graph.commit()
in it.
//update the 'assetsCountToPropagate' in the current task vertex. | ||
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); | ||
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); | ||
graph.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove graph.commit()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a new function updateTaskVertexProperty
that is used everywhere and it doesn't have graph.commit()
in it.
@@ -3460,9 +3481,16 @@ public List<String> propagateClassification(String entityGuid, String classifica | |||
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false; | |||
List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude); | |||
|
|||
// update the 'assetsCountToPropagate' on in memory java object. | |||
AtlasTask currentTask = RequestContext.get().getCurrentTask(); | |||
currentTask.setAssetsCountToPropagate((long) impactedVertices.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a method to update the task vertex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a new function updateTaskVertexProperty
that is used everywhere and it doesn't have graph.commit()
in it.
//update the 'assetsCountToPropagate' in the current task vertex. | ||
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); | ||
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); | ||
graph.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove graph.commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a new function updateTaskVertexProperty
that is used everywhere and it doesn't have graph.commit()
in it.
@@ -4351,6 +4416,15 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi | |||
int toIndex; | |||
int offset = 0; | |||
|
|||
// update the 'assetsCountToPropagate' on in memory java object. | |||
AtlasTask currentTask = RequestContext.get().getCurrentTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too much code repetition, create a generic method for updating it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a new function updateTaskVertexProperty
that is used everywhere and it doesn't have graph.commit()
in it.
@@ -100,9 +101,7 @@ public AtlasTask.Status perform() throws AtlasBaseException { | |||
|
|||
try { | |||
setStatus(IN_PROGRESS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need for this changes
@@ -361,8 +363,8 @@ public List<AtlasTask> getTasksForReQueueGraphQuery() { | |||
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME); | |||
|
|||
List<AtlasGraphQuery> orConditions = new LinkedList<>(); | |||
orConditions.add(query.createChildQuery().has(Constants.TASK_STATUS, AtlasTask.Status.IN_PROGRESS)); | |||
orConditions.add(query.createChildQuery().has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)); | |||
orConditions.add(query.createChildQuery().has(TASK_STATUS, AtlasTask.Status.IN_PROGRESS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason for these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted these back
@@ -28,6 +28,7 @@ on: | |||
- master | |||
- lineageondemand | |||
- makerlogic | |||
- tagpropv1master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do once all review is complete and we are ready to move to master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly reviews are for code quality
removed madu from review because of new team structure |
…k vertex. The former will be updated with the total count of propagations to be done once the planning phase is complete and the task begins execution. The latter will be updated as the task progresses, reflecting the count of completed propagations at any given point.
…'refresh prop' and 'cleanup prop'
dbae477
to
164ec29
Compare
@@ -55,6 +61,21 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { | |||
private final RedisService redisService; | |||
private Thread watcherThread = null; | |||
|
|||
public void updateTaskVertexProperty(String propertyKey, AtlasGraph graph, long value, boolean isIncremental, BiConsumer<AtlasTask, Long> taskSetter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine, just add few descriptions how we are using BiConsumer, as its bit complicated to understand for reviewer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just add proper description, all looks good now otherwise. Will approve
intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
Outdated
Show resolved
Hide resolved
RequestContext.get().setCurrentTask(task); | ||
|
||
task.setStartTime(new Date()); | ||
|
||
task.setAssetsCountToPropagate(runnableTask.getAssetsCountToPropagate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure you have the real number here to set assetsCountToPropagate
?
IMO here you will always get it as 0, as per previous code I saw while creating Task vertex we already write value as 0, I feel this is not needed
DG1924
Design plan ref :Execution Plan - Phase v1 - WIP | 1. Task Vertex
Test cases: DG1924 - TestCase
The task for this ticket:
Add 'assetsCountToPropagate' and 'assetsCountPropagated' params to the task vertex and
update 'assetsCountToPropagate' with a count of the assets count to propagate once the planning phase is finished.
update 'assetsCountPropagated' with a count as the task progresses.
Type of change
Related issues
Checklists
Development
Security
Code review