From 4084aea1246c91ba2e412a92cc16a89863e54cbb Mon Sep 17 00:00:00 2001 From: Enrique Gonzalez Martinez Date: Tue, 31 Oct 2023 11:58:54 +0100 Subject: [PATCH] fix logic in data audit --- .../kogito-addons-data-audit-common/pom.xml | 10 ++-- .../audit/api/DataAuditStoreProxyService.java | 5 ++ .../audit/graphql/GraphQLSchemaManager.java | 7 ++- .../kogito/app/audit/spi/DataAuditStore.java | 3 ++ .../pom.xml | 22 +++++++-- .../app/audit/jpa/JPADataAuditStore.java | 29 +++++++++++ .../app/audit/jpa/model/JobExecutionLog.java | 2 +- .../src/main/resources/META-INF/job-orm.xml | 48 +++++++++---------- .../quarkus/GraphQLJPADataAuditRouter.java | 4 +- .../QuarkusJPADataAuditEventPublisher.java | 9 +++- .../src/main/resources/application.properties | 6 +-- .../app/audit/quarkus/DataAuditTestUtils.java | 2 +- ...arkusAuditUserTaskInstanceServiceTest.java | 6 +-- .../src/test/resources/application.properties | 11 ++++- .../SpringBootJPADataAuditEventPublisher.java | 8 ++++ .../UserTaskInstanceStateEventMerger.java | 4 +- .../data-index-storage-postgresql/.gitignore | 1 + .../stream/EventPublisherJobStreams.java | 29 ++++------- .../stream/EventPublisherJobStreamsTest.java | 5 +- 19 files changed, 138 insertions(+), 73 deletions(-) create mode 100644 data-index/data-index-storage/data-index-storage-postgresql/.gitignore diff --git a/data-audit/kogito-addons-data-audit-common/pom.xml b/data-audit/kogito-addons-data-audit-common/pom.xml index 38ce64472a..c88e5b1905 100644 --- a/data-audit/kogito-addons-data-audit-common/pom.xml +++ b/data-audit/kogito-addons-data-audit-common/pom.xml @@ -1,12 +1,10 @@ 4.0.0 - org.kie.kogito + org.kie.kogito data-audit 2.0.0-SNAPSHOT @@ -32,6 +30,10 @@ org.kie.kogito jobs-service-api + + org.kie.kogito + jobs-service-internal-api + org.slf4j slf4j-api diff --git a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/api/DataAuditStoreProxyService.java b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/api/DataAuditStoreProxyService.java index 40b90707ab..13a34d0a1e 100644 --- a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/api/DataAuditStoreProxyService.java +++ b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/api/DataAuditStoreProxyService.java @@ -21,6 +21,7 @@ import java.util.ServiceLoader; import org.kie.kogito.app.audit.spi.DataAuditStore; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -74,6 +75,10 @@ public void storeUserTaskInstanceDataEvent(DataAuditContext context, UserTaskIns public void storeJobDataEvent(DataAuditContext context, JobCloudEvent event) { auditStoreService.storeJobDataEvent(context, event); + } + + public void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent event) { + auditStoreService.storeJobDataEvent(context, event); } diff --git a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/graphql/GraphQLSchemaManager.java b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/graphql/GraphQLSchemaManager.java index 92f084fa98..7d8e66b21f 100644 --- a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/graphql/GraphQLSchemaManager.java +++ b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/graphql/GraphQLSchemaManager.java @@ -61,7 +61,10 @@ private GraphQLSchemaManager() { runtimeWiringBuilder.scalar(ExtendedScalars.DateTime); runtimeWiringBuilder.scalar(ExtendedScalars.Json); - ServiceLoader.load(GraphQLSchemaQueryProvider.class).forEach(queryProvider -> { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + classLoader = (classLoader != null) ? classLoader : this.getClass().getClassLoader(); + + ServiceLoader.load(GraphQLSchemaQueryProvider.class, classLoader).forEach(queryProvider -> { for (GraphQLSchemaQuery query : queryProvider.queries()) { runtimeWiringBuilder.type("Query", builder -> builder.dataFetcher(query.name(), query::fetch)); } @@ -72,7 +75,7 @@ private GraphQLSchemaManager() { SchemaGenerator schemaGenerator = new SchemaGenerator(); graphQLSchema = schemaGenerator.makeExecutableSchema(typeDefinitionRegistry, runtimeWiring); } catch (IOException e) { - LOGGER.error("could not find data-audit.graphqls", e); + LOGGER.error("could not find or process data-audit.graphqls", e); } } diff --git a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/spi/DataAuditStore.java b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/spi/DataAuditStore.java index 30fa925131..adf4231d17 100644 --- a/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/spi/DataAuditStore.java +++ b/data-audit/kogito-addons-data-audit-common/src/main/java/org/kie/kogito/app/audit/spi/DataAuditStore.java @@ -19,6 +19,7 @@ package org.kie.kogito.app.audit.spi; import org.kie.kogito.app.audit.api.DataAuditContext; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; @@ -59,4 +60,6 @@ public interface DataAuditStore { void storeJobDataEvent(DataAuditContext context, JobCloudEvent jobDataEvent); + void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent event); + } diff --git a/data-audit/kogito-addons-data-audit-jpa-common/pom.xml b/data-audit/kogito-addons-data-audit-jpa-common/pom.xml index b46818db21..4d50970c13 100644 --- a/data-audit/kogito-addons-data-audit-jpa-common/pom.xml +++ b/data-audit/kogito-addons-data-audit-jpa-common/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 org.kie.kogito @@ -9,9 +11,17 @@ kogito-addons-data-audit-jpa-common Kogito Apps :: Data Audit :: JPA Data Audit Persistence Common - + + org.kie.kogito + kogito-events-core + + + org.kie.kogito + jobs-service-api + + org.kie.kogito - kogito-addons-data-audit-common + kogito-addons-data-audit-common com.fasterxml.jackson.core @@ -81,10 +91,12 @@ maven-surefire-plugin - ${project.build.directory}/ObjectStore + + ${project.build.directory}/ObjectStore - org.kie.kogito.app.audit:data-audit-service-tck + + org.kie.kogito.app.audit:data-audit-service-tck diff --git a/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/JPADataAuditStore.java b/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/JPADataAuditStore.java index ce52575d40..7e0b7c7e20 100644 --- a/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/JPADataAuditStore.java +++ b/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/JPADataAuditStore.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.app.audit.jpa; +import java.io.IOException; import java.net.MalformedURLException; import java.sql.Timestamp; import java.util.Date; @@ -44,6 +45,7 @@ import org.kie.kogito.app.audit.jpa.model.UserTaskInstanceVariableLog; import org.kie.kogito.app.audit.jpa.model.UserTaskInstanceVariableLog.VariableType; import org.kie.kogito.app.audit.spi.DataAuditStore; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -61,11 +63,13 @@ import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent; import org.kie.kogito.jobs.service.api.Job; import org.kie.kogito.jobs.service.api.event.JobCloudEvent; +import org.kie.kogito.jobs.service.model.ScheduledJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; public class JPADataAuditStore implements DataAuditStore { @@ -75,6 +79,7 @@ public class JPADataAuditStore implements DataAuditStore { public JPADataAuditStore() { mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); } @Override @@ -335,6 +340,30 @@ public void storeJobDataEvent(DataAuditContext context, JobCloudEvent jobDa entityManager.persist(log); } + @Override + public void storeJobDataEvent(DataAuditContext context, JobInstanceDataEvent jobDataEvent) { + + ScheduledJob job = toObject(ScheduledJob.class, jobDataEvent.getData()); + + JobExecutionLog log = new JobExecutionLog(); + log.setCorrelationId(jobDataEvent.getKogitoProcessInstanceId()); + log.setJobId(job.getId()); + log.setState(job.getStatus().name()); + log.setTimestamp(Timestamp.from(jobDataEvent.getTime().toInstant())); + + EntityManager entityManager = context.getContext(); + entityManager.persist(log); + } + + private T toObject(Class clazz, byte[] bytes) { + try { + return clazz.cast(mapper.readValue(bytes, clazz)); + } catch (IOException e) { + LOGGER.error("could not convert to json string {}", new String(bytes), e); + return null; + } + } + private String toJsonString(Object data) { try { if (data == null) { diff --git a/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/model/JobExecutionLog.java b/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/model/JobExecutionLog.java index a20864987d..2dfec64a21 100644 --- a/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/model/JobExecutionLog.java +++ b/data-audit/kogito-addons-data-audit-jpa-common/src/main/java/org/kie/kogito/app/audit/jpa/model/JobExecutionLog.java @@ -31,7 +31,7 @@ import javax.persistence.TemporalType; @Entity -@Table(name = "JobExecutionHistory") +@Table(name = "JobExecutionLog") @SequenceGenerator(name = "jobExecutionHistoryIdSeq", sequenceName = "JOB_EXECUTION_HISTORY_ID_SEQ") public class JobExecutionLog { diff --git a/data-audit/kogito-addons-data-audit-jpa-common/src/main/resources/META-INF/job-orm.xml b/data-audit/kogito-addons-data-audit-jpa-common/src/main/resources/META-INF/job-orm.xml index 45dd1b3175..34324c2a50 100644 --- a/data-audit/kogito-addons-data-audit-jpa-common/src/main/resources/META-INF/job-orm.xml +++ b/data-audit/kogito-addons-data-audit-jpa-common/src/main/resources/META-INF/job-orm.xml @@ -26,8 +26,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state = 'SCHEDULED' @@ -36,8 +36,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.job_id = :jobId @@ -45,7 +45,7 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 + FROM JobExecutionLog o1 WHERE o1.job_id = :jobId ORDER BY o1.timestamp DESC @@ -54,7 +54,7 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 + FROM JobExecutionLog o1 WHERE o1.correlation_id = :correlationId ORDER BY o1.timestamp DESC @@ -63,8 +63,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state IN ('SCHEDULED', 'RETRY') @@ -72,8 +72,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state IN ('SCHEDULED') @@ -81,8 +81,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestamp - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state IN ('RETRY', 'ERROR') @@ -91,8 +91,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL ORDER BY o1.timestamp DESC @@ -101,8 +101,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state = 'EXECUTED' @@ -110,8 +110,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state = 'ERROR' @@ -119,8 +119,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state = 'CANCELED' @@ -129,8 +129,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.state IN (:state) ORDER BY o1.timestamp DESC @@ -139,8 +139,8 @@ SELECT o1.job_id as jobId, o1.correlation_id as correlationId, o1.state as state, o1.schedule, o1.retry, o1.execution_timeout as executionTimeout, o1.execution_timeout_unit as executionTimeoutUnit, o1.timestamp as timestmap - FROM JobExecutionHistory o1 - LEFT JOIN JobExecutionHistory o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp + FROM JobExecutionLog o1 + LEFT JOIN JobExecutionLog o2 ON o1.job_id = o2.job_id AND o1.timestamp < o2.timestamp WHERE o2.job_id IS NULL AND o1.correlation_id = :correlationId ORDER BY o1.timestamp DESC diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/GraphQLJPADataAuditRouter.java b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/GraphQLJPADataAuditRouter.java index f50dfadd86..4405a80d24 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/GraphQLJPADataAuditRouter.java +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/GraphQLJPADataAuditRouter.java @@ -20,8 +20,8 @@ import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; import org.kie.kogito.app.audit.api.DataAuditQueryService; @@ -45,7 +45,7 @@ public class GraphQLJPADataAuditRouter { GraphQLHandler graphQLHandler; - @Inject + @PersistenceContext(unitName = "DataAuditPU") EntityManager entityManager; @PostConstruct diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/QuarkusJPADataAuditEventPublisher.java b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/QuarkusJPADataAuditEventPublisher.java index 73f5438171..6b9cbb0cd7 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/QuarkusJPADataAuditEventPublisher.java +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/java/org/kie/kogito/app/audit/quarkus/QuarkusJPADataAuditEventPublisher.java @@ -30,6 +30,7 @@ import org.kie.kogito.app.audit.api.DataAuditEventPublisher; import org.kie.kogito.app.audit.api.DataAuditStoreProxyService; import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; import org.kie.kogito.jobs.service.api.Job; @@ -44,7 +45,7 @@ public class QuarkusJPADataAuditEventPublisher implements DataAuditEventPublishe private DataAuditStoreProxyService proxy; - @PersistenceContext + @PersistenceContext(unitName = "DataAuditPU") EntityManager entityManager; public QuarkusJPADataAuditEventPublisher() { @@ -69,9 +70,13 @@ public void publish(DataEvent event) { LOGGER.debug("Processing user task instacne event {}", event); proxy.storeUserTaskInstanceDataEvent(DataAuditContext.newDataAuditContext(entityManager), (UserTaskInstanceDataEvent) event); return; + } else if (event instanceof JobInstanceDataEvent) { + LOGGER.info("Processing job instance event {}", event); + proxy.storeJobDataEvent(DataAuditContext.newDataAuditContext(entityManager), (JobInstanceDataEvent) event); + return; } - LOGGER.debug("Discard event {} as class {} is not supported by this", event, event.getClass().getName()); + LOGGER.info("Discard event {} as class {} is not supported by this", event, event.getClass().getName()); } @Override diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/resources/application.properties b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/resources/application.properties index b8b3b05ce2..0b034e5aa0 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/resources/application.properties +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/main/resources/application.properties @@ -1,3 +1,3 @@ -quarkus.hibernate-orm.enabled=true -quarkus.hibernate-orm.packages=org.kie.kogito.app.audit.jpa.model -quarkus.hibernate-orm.mapping-files=META-INF/entity-orm.xml,META-INF/job-orm.xml,META-INF/process-orm.xml,META-INF/usertask-orm.xml \ No newline at end of file +quarkus.hibernate-orm.DataAuditPU.enabled=true +quarkus.hibernate-orm.DataAuditPU.packages=org.kie.kogito.app.audit.jpa.model +quarkus.hibernate-orm.DataAuditPU.mapping-files=META-INF/entity-orm.xml,META-INF/job-orm.xml,META-INF/process-orm.xml,META-INF/usertask-orm.xml \ No newline at end of file diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/DataAuditTestUtils.java b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/DataAuditTestUtils.java index 87be21997d..2dd0d1a66c 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/DataAuditTestUtils.java +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/DataAuditTestUtils.java @@ -254,7 +254,7 @@ private static Map extractProcessInstnaceEventMetadata(ProcessIn // user task instance stuff public static UserTaskInstanceStateDataEvent newUserTaskInstanceStateEvent( - String eventUser, String userTaskDefinitionId, String userTaskInstanceId, String userTaskName, Integer eventType, + String eventUser, String userTaskDefinitionId, String userTaskInstanceId, String userTaskName, String eventType, String userTaskDescription, String userTaskPriority, String userTaskReferenceName, String state, String actualOwner, String processInstanceId) { String processId = UUID.randomUUID().toString(); diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/QuarkusAuditUserTaskInstanceServiceTest.java b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/QuarkusAuditUserTaskInstanceServiceTest.java index 88eaec0be6..6f7c3ee1bc 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/QuarkusAuditUserTaskInstanceServiceTest.java +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/java/org/kie/kogito/app/audit/quarkus/QuarkusAuditUserTaskInstanceServiceTest.java @@ -64,10 +64,10 @@ public class QuarkusAuditUserTaskInstanceServiceTest { public void init() { UserTaskInstanceStateDataEvent uEvent; - uEvent = newUserTaskInstanceStateEvent("eventUser", "utd1", "1", "utn1", 1, "utd1", "utp1", "utrn1", "Ready", "owner", "1"); + uEvent = newUserTaskInstanceStateEvent("eventUser", "utd1", "1", "utn1", "1", "utd1", "utp1", "utrn1", "Ready", "owner", "1"); publisher.publish(uEvent); - uEvent = newUserTaskInstanceStateEvent("eventUser", "utd1", "1", "utn1", 1, "utd1", "utp1", "utrn1", "Claimed", "owner", "1"); + uEvent = newUserTaskInstanceStateEvent("eventUser", "utd1", "1", "utn1", "1", "utd1", "utp1", "utrn1", "Claimed", "owner", "1"); publisher.publish(uEvent); UserTaskInstanceVariableDataEvent vEvent; @@ -107,7 +107,7 @@ public void init() { deadlineEvent = newUserTaskInstanceDeadlineEvent(uEvent, "eventUser", Collections.singletonMap("input1", "value1"), Collections.singletonMap("notification1", "notificationValue")); publisher.publish(deadlineEvent); - uEvent = newUserTaskInstanceStateEvent("eventUser", "utd2", "2", "utn2", 1, "utd2", "utp2", "utrn2", "Claimed", "owner", "1"); + uEvent = newUserTaskInstanceStateEvent("eventUser", "utd2", "2", "utn2", "1", "utd2", "utp2", "utrn2", "Claimed", "owner", "1"); publisher.publish(uEvent); } diff --git a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/resources/application.properties b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/resources/application.properties index 24a1f9d8fb..bce897bab3 100644 --- a/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/resources/application.properties +++ b/data-audit/kogito-addons-data-audit-jpa-quarkus/src/test/resources/application.properties @@ -3,7 +3,16 @@ quarkus.datasource.username=hibernate quarkus.datasource.password=hibernate quarkus.datasource.jdbc.url=jdbc:h2:mem:test -quarkus.hibernate-orm.database.generation=create-drop +quarkus.datasource.data-audit.db-kind=h2 +quarkus.datasource.data-audit.username=hibernate +quarkus.datasource.data-audit.password=hibernate +quarkus.datasource.data-audit.jdbc.url=jdbc:h2:mem:test + +quarkus.hibernate-orm.DataAuditPU.enabled=true +quarkus.hibernate-orm.DataAuditPU.packages=org.kie.kogito.app.audit.jpa.model +quarkus.hibernate-orm.DataAuditPU.mapping-files=META-INF/entity-orm.xml,META-INF/job-orm.xml,META-INF/process-orm.xml,META-INF/usertask-orm.xml +quarkus.hibernate-orm.DataAuditPU.database.generation=create-drop +quarkus.hibernate-orm.DataAuditPU.datasource=data-audit diff --git a/data-audit/kogito-addons-data-audit-jpa-springboot/src/main/java/org/kie/kogito/app/audit/springboot/SpringBootJPADataAuditEventPublisher.java b/data-audit/kogito-addons-data-audit-jpa-springboot/src/main/java/org/kie/kogito/app/audit/springboot/SpringBootJPADataAuditEventPublisher.java index 60115af506..98334806ec 100644 --- a/data-audit/kogito-addons-data-audit-jpa-springboot/src/main/java/org/kie/kogito/app/audit/springboot/SpringBootJPADataAuditEventPublisher.java +++ b/data-audit/kogito-addons-data-audit-jpa-springboot/src/main/java/org/kie/kogito/app/audit/springboot/SpringBootJPADataAuditEventPublisher.java @@ -22,9 +22,11 @@ import javax.persistence.EntityManager; +import org.kie.kogito.app.audit.api.DataAuditContext; import org.kie.kogito.app.audit.api.DataAuditEventPublisher; import org.kie.kogito.app.audit.api.DataAuditStoreProxyService; import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; import org.kie.kogito.jobs.service.api.Job; @@ -32,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -45,6 +48,7 @@ public class SpringBootJPADataAuditEventPublisher implements DataAuditEventPubli private DataAuditStoreProxyService proxy; @Autowired + @Qualifier("DataAuditPU") EntityManager entityManager; public SpringBootJPADataAuditEventPublisher() { @@ -67,6 +71,10 @@ public void publish(DataEvent event) { LOGGER.debug("Processing user task instacne event {}", event); proxy.storeUserTaskInstanceDataEvent(newDataAuditContext(entityManager), (UserTaskInstanceDataEvent) event); return; + } else if (event instanceof JobInstanceDataEvent) { + LOGGER.info("Processing job instance event {}", event); + proxy.storeJobDataEvent(DataAuditContext.newDataAuditContext(entityManager), (JobInstanceDataEvent) event); + return; } LOGGER.debug("Discard event {} as class {} is not supported by this", event, event.getClass().getName()); diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/event/mapper/UserTaskInstanceStateEventMerger.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/event/mapper/UserTaskInstanceStateEventMerger.java index 9429c87071..3172e965cc 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/event/mapper/UserTaskInstanceStateEventMerger.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/event/mapper/UserTaskInstanceStateEventMerger.java @@ -55,9 +55,9 @@ public void merge(UserTaskInstance task, UserTaskInstanceDataEvent data) { task.setDescription(event.getData().getUserTaskDescription()); task.setState(event.getData().getState()); task.setPriority(event.getData().getUserTaskPriority()); - if (event.getData().getEventType() == null || event.getData().getEventType() == 1) { + if (event.getData().getEventType() == null || "Ready".equals(event.getData().getEventType())) { task.setStarted(toZonedDateTime(event.getData().getEventDate())); - } else if (event.getData().getEventType() == 2) { + } else if ("Completed".equals(event.getData().getEventType())) { task.setCompleted(toZonedDateTime(event.getData().getEventDate())); } diff --git a/data-index/data-index-storage/data-index-storage-postgresql/.gitignore b/data-index/data-index-storage/data-index-storage-postgresql/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/data-index/data-index-storage/data-index-storage-postgresql/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java index 9901ca4ffa..ff7d6a612f 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java @@ -18,6 +18,9 @@ */ package org.kie.kogito.addons.quarkus.jobs.service.embedded.stream; +import java.util.List; +import java.util.stream.Collectors; + import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Instance; import javax.inject.Inject; @@ -25,8 +28,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.kie.kogito.event.AbstractDataEvent; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.jobs.JobsServiceException; import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter; import org.kie.kogito.jobs.service.model.JobDetails; @@ -55,7 +58,7 @@ public class EventPublisherJobStreams { private final String url; - private final EventPublisher eventPublisher; + private final List eventPublisher; private final ObjectMapper objectMapper; @@ -64,10 +67,7 @@ public EventPublisherJobStreams(@ConfigProperty(name = "kogito.service.url", def Instance eventPublishers, ObjectMapper objectMapper) { this.url = url; - eventPublisher = eventPublishers.stream() - .filter(publisher -> publisher.getClass().getName().startsWith(DATA_INDEX_EVENT_PUBLISHER)) - .findFirst() - .orElse(null); + eventPublisher = eventPublishers.stream().collect(Collectors.toList()); this.objectMapper = objectMapper; } @@ -83,7 +83,7 @@ public void onJobStatusChange(JobDetails jobDetails) { } catch (Exception e) { throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + scheduledJob, e); } - EventPublisherJobDataEvent event = new EventPublisherJobDataEvent(JOB_EVENT_TYPE, + JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, url + RestApiConstants.JOBS_PATH, jsonContent, scheduledJob.getProcessInstanceId(), @@ -92,24 +92,11 @@ public void onJobStatusChange(JobDetails jobDetails) { scheduledJob.getRootProcessId(), null); try { - eventPublisher.publish(event); + eventPublisher.forEach(e -> e.publish(event)); } catch (Exception e) { LOGGER.error("Job status change propagation has failed at eventPublisher: " + eventPublisher.getClass() + " execution.", e); } } } - public static class EventPublisherJobDataEvent extends AbstractDataEvent { - public EventPublisherJobDataEvent(String type, - String source, - byte[] data, - String kogitoProcessInstanceId, - String kogitoRootProcessInstanceId, - String kogitoProcessId, - String kogitoRootProcessId, - String kogitoIdentity) { - super(type, source, data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId, - kogitoRootProcessId, null, kogitoIdentity); - } - } } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java index c098cd2d87..0144ac0c4d 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.index.addon.DataIndexEventPublisherMock; import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient; import org.kie.kogito.jobs.service.model.JobDetails; @@ -87,7 +88,7 @@ class EventPublisherJobStreamsTest { @Test void onJobStatusChange() throws Exception { - ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(EventPublisherJobStreams.EventPublisherJobDataEvent.class); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(JobInstanceDataEvent.class); DataIndexEventPublisherMock eventPublisher = spy(new DataIndexEventPublisherMock()); Instance eventPublisherInstance = mock(Instance.class); Stream eventPublishers = Arrays.stream(new EventPublisher[] { eventPublisher }); @@ -104,7 +105,7 @@ void onJobStatusChange() throws Exception { verify(eventPublisher).publish(eventCaptor.capture()); verify(eventPublisher, never()).publish(anyCollection()); - EventPublisherJobStreams.EventPublisherJobDataEvent event = eventCaptor.getValue(); + JobInstanceDataEvent event = eventCaptor.getValue(); assertThat(event).isNotNull(); assertThat(event.getSpecVersion()).hasToString("1.0");