Skip to content

Commit

Permalink
fix logic in data audit
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Oct 31, 2023
1 parent 64c031d commit 4084aea
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 73 deletions.
10 changes: 6 additions & 4 deletions data-audit/kogito-addons-data-audit-common/pom.xml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito</groupId>
<groupId>org.kie.kogito</groupId>
<artifactId>data-audit</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
Expand All @@ -32,6 +30,10 @@
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-internal-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ServiceLoader;

import org.kie.kogito.app.audit.spi.DataAuditStore;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -74,6 +75,10 @@ public void storeUserTaskInstanceDataEvent(DataAuditContext context, UserTaskIns

public void storeJobDataEvent(DataAuditContext context, JobCloudEvent<Job> event) {
auditStoreService.storeJobDataEvent(context, event);
}

public void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent event) {
auditStoreService.storeJobDataEvent(context, event);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ private GraphQLSchemaManager() {
runtimeWiringBuilder.scalar(ExtendedScalars.DateTime);
runtimeWiringBuilder.scalar(ExtendedScalars.Json);

ServiceLoader.load(GraphQLSchemaQueryProvider.class).forEach(queryProvider -> {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
classLoader = (classLoader != null) ? classLoader : this.getClass().getClassLoader();

ServiceLoader.load(GraphQLSchemaQueryProvider.class, classLoader).forEach(queryProvider -> {
for (GraphQLSchemaQuery<?> query : queryProvider.queries()) {
runtimeWiringBuilder.type("Query", builder -> builder.dataFetcher(query.name(), query::fetch));
}
Expand All @@ -72,7 +75,7 @@ private GraphQLSchemaManager() {
SchemaGenerator schemaGenerator = new SchemaGenerator();
graphQLSchema = schemaGenerator.makeExecutableSchema(typeDefinitionRegistry, runtimeWiring);
} catch (IOException e) {
LOGGER.error("could not find data-audit.graphqls", e);
LOGGER.error("could not find or process data-audit.graphqls", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.kie.kogito.app.audit.spi;

import org.kie.kogito.app.audit.api.DataAuditContext;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
Expand Down Expand Up @@ -59,4 +60,6 @@ public interface DataAuditStore {

void storeJobDataEvent(DataAuditContext context, JobCloudEvent<Job> jobDataEvent);

void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent event);

}
22 changes: 17 additions & 5 deletions data-audit/kogito-addons-data-audit-jpa-common/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito</groupId>
Expand All @@ -9,9 +11,17 @@
<artifactId>kogito-addons-data-audit-jpa-common</artifactId>
<name>Kogito Apps :: Data Audit :: JPA Data Audit Persistence Common</name>
<dependencies>
<dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-events-core</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-data-audit-common</artifactId>
<artifactId>kogito-addons-data-audit-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -81,10 +91,12 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<com.arjuna.ats.arjuna.objectstore.objectStoreDir>${project.build.directory}/ObjectStore</com.arjuna.ats.arjuna.objectstore.objectStoreDir>
<com.arjuna.ats.arjuna.objectstore.objectStoreDir>
${project.build.directory}/ObjectStore</com.arjuna.ats.arjuna.objectstore.objectStoreDir>
</systemProperties>
<dependenciesToScan>
<dependency>org.kie.kogito.app.audit:data-audit-service-tck</dependency>
<dependency>
org.kie.kogito.app.audit:data-audit-service-tck</dependency>
</dependenciesToScan>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.app.audit.jpa;

import java.io.IOException;
import java.net.MalformedURLException;
import java.sql.Timestamp;
import java.util.Date;
Expand All @@ -44,6 +45,7 @@
import org.kie.kogito.app.audit.jpa.model.UserTaskInstanceVariableLog;
import org.kie.kogito.app.audit.jpa.model.UserTaskInstanceVariableLog.VariableType;
import org.kie.kogito.app.audit.spi.DataAuditStore;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand All @@ -61,11 +63,13 @@
import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
import org.kie.kogito.jobs.service.api.Job;
import org.kie.kogito.jobs.service.api.event.JobCloudEvent;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

public class JPADataAuditStore implements DataAuditStore {

Expand All @@ -75,6 +79,7 @@ public class JPADataAuditStore implements DataAuditStore {

public JPADataAuditStore() {
mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
}

@Override
Expand Down Expand Up @@ -335,6 +340,30 @@ public void storeJobDataEvent(DataAuditContext context, JobCloudEvent<Job> jobDa
entityManager.persist(log);
}

@Override
public void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent jobDataEvent) {

ScheduledJob job = toObject(ScheduledJob.class, jobDataEvent.getData());

JobExecutionLog log = new JobExecutionLog();
log.setCorrelationId(jobDataEvent.getKogitoProcessInstanceId());
log.setJobId(job.getId());
log.setState(job.getStatus().name());
log.setTimestamp(Timestamp.from(jobDataEvent.getTime().toInstant()));

EntityManager entityManager = context.getContext();
entityManager.persist(log);
}

private <T> T toObject(Class<T> clazz, byte[] bytes) {
try {
return clazz.cast(mapper.readValue(bytes, clazz));
} catch (IOException e) {
LOGGER.error("could not convert to json string {}", new String(bytes), e);
return null;
}
}

private String toJsonString(Object data) {
try {
if (data == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import javax.persistence.TemporalType;

@Entity
@Table(name = "JobExecutionHistory")
@Table(name = "JobExecutionLog")
@SequenceGenerator(name = "jobExecutionHistoryIdSeq", sequenceName = "JOB_EXECUTION_HISTORY_ID_SEQ")
public class JobExecutionLog {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
<named-native-query name="GetAllScheduledJobs" result-set-mapping="JobExecutionTO" >
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state = 'SCHEDULED'
</query>
</named-native-query>
Expand All @@ -36,16 +36,16 @@
<named-native-query name="GetJobById" result-set-mapping="JobExecutionTO" >
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.job_id = :jobId
</query>
</named-native-query>

<named-native-query name="GetJobHistoryById" result-set-mapping="JobExecutionTO" >
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
FROM JobExecutionLog o1
WHERE o1.job_id = :jobId
ORDER BY o1.timestamp DESC
</query>
Expand All @@ -54,7 +54,7 @@
<named-native-query name="GetJobHistoryByCorrelationId" result-set-mapping="JobExecutionTO" >
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
FROM JobExecutionLog o1
WHERE o1.correlation_id = :correlationId
ORDER BY o1.timestamp DESC
</query>
Expand All @@ -63,26 +63,26 @@
<named-native-query name="GetAllPendingJobs" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state IN ('SCHEDULED', 'RETRY')
</query>
</named-native-query>

<named-native-query name="GetAllEligibleJobsForExecution" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state IN ('SCHEDULED')
</query>
</named-native-query>

<named-native-query name="GetAllEligibleJobsForRetry" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state IN ('RETRY', 'ERROR')
</query>
</named-native-query>
Expand All @@ -91,8 +91,8 @@
<named-native-query name="GetAllJobs" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL
ORDER BY o1.timestamp DESC
</query>
Expand All @@ -101,26 +101,26 @@
<named-native-query name="GetAllCompletedJobs" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state = 'EXECUTED'
</query>
</named-native-query>

<named-native-query name="GetAllInErrorJobs" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state = 'ERROR'
</query>
</named-native-query>

<named-native-query name="GetAllCancelledJobs" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state = 'CANCELED'
</query>
</named-native-query>
Expand All @@ -129,8 +129,8 @@
<named-native-query name="GetAllJobsByState" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.state IN (:state)
ORDER BY o1.timestamp DESC
</query>
Expand All @@ -139,8 +139,8 @@
<named-native-query name="GetJobByCorrelationId" result-set-mapping="JobExecutionTO">
<query>
SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap
FROM JobExecutionHistory o1
LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
FROM JobExecutionLog o1
LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp &lt; o2.timestamp
WHERE o2.job_id IS NULL AND o1.correlation_id = :correlationId
ORDER BY o1.timestamp DESC
</query>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import org.kie.kogito.app.audit.api.DataAuditQueryService;

Expand All @@ -45,7 +45,7 @@ public class GraphQLJPADataAuditRouter {

GraphQLHandler graphQLHandler;

@Inject
@PersistenceContext(unitName = "DataAuditPU")
EntityManager entityManager;

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.kie.kogito.app.audit.api.DataAuditEventPublisher;
import org.kie.kogito.app.audit.api.DataAuditStoreProxyService;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.jobs.service.api.Job;
Expand All @@ -44,7 +45,7 @@ public class QuarkusJPADataAuditEventPublisher implements DataAuditEventPublishe

private DataAuditStoreProxyService proxy;

@PersistenceContext
@PersistenceContext(unitName = "DataAuditPU")
EntityManager entityManager;

public QuarkusJPADataAuditEventPublisher() {
Expand All @@ -69,9 +70,13 @@ public void publish(DataEvent<?> event) {
LOGGER.debug("Processing user task instacne event {}", event);
proxy.storeUserTaskInstanceDataEvent(DataAuditContext.newDataAuditContext(entityManager), (UserTaskInstanceDataEvent<?>) event);
return;
} else if (event instanceof JobInstanceDataEvent) {
LOGGER.info("Processing job instance event {}", event);
proxy.storeJobDataEvent(DataAuditContext.newDataAuditContext(entityManager), (JobInstanceDataEvent) event);
return;
}

LOGGER.debug("Discard event {} as class {} is not supported by this", event, event.getClass().getName());
LOGGER.info("Discard event {} as class {} is not supported by this", event, event.getClass().getName());
}

@Override
Expand Down
Loading

0 comments on commit 4084aea

Please sign in to comment.