Skip to content

Commit

Permalink
[JBPM-10197] Alternative approach
Browse files Browse the repository at this point in the history
Update WildflyEJBTimerRetriever.java
  • Loading branch information
fjtirado committed Nov 27, 2023
1 parent 324e4d0 commit b76d4d3
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.services.ejb.timer;

import java.util.Collection;
import java.util.stream.Collectors;

import javax.ejb.Timer;
import javax.ejb.TimerService;

class DefaultEJBTimerRetriever extends EJBTimerRetriever<Void> {

protected DefaultEJBTimerRetriever(TimerService timerService) {
super(timerService);
}

@Override
public Collection<Object> getTimers(String jobName, Void accepted) {
return timerService.getTimers().stream().map(Timer::getInfo).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.services.ejb.timer;

enum EJBTimerDB {
ORACLE("select info from jboss_ejb_timer where utl_raw.cast_to_varchar2(utl_encode.base64_decode(utl_raw.cast_to_raw(dbms_lob.substr(info,2000,1)))) like '%?%'"),
POSTGRESQL("select info from jboss_ejb_timerwhere decode(info, 'base64') like '%?%'");

private final String query;

private EJBTimerDB (String query) {
this.query = query;
}

public String getQuery() {
return query;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.services.ejb.timer;


import java.util.Collection;
import java.util.Optional;

import javax.ejb.TimerService;

import org.jbpm.process.core.timer.impl.GlobalTimerService;

public abstract class EJBTimerRetriever<T> {

protected final TimerService timerService;

protected EJBTimerRetriever (TimerService timerService) {
this.timerService = timerService;
}

public Optional<T> accept (GlobalTimerService globalTimerService) {
return Optional.empty();
}

public abstract Collection<Object> getTimers(String jobName,T accepted);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
import java.lang.reflect.Method;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -49,9 +54,11 @@
import javax.transaction.UserTransaction;

import org.drools.core.time.JobHandle;
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.persistence.timer.GlobalJpaTimerJobInstance;
import org.jbpm.process.core.timer.TimerServiceRegistry;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.kie.internal.runtime.manager.SessionNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -73,8 +80,12 @@ public class EJBTimerScheduler {

private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));

private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();
private Map<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

private Collection<EJBTimerRetriever<?>> timerRetrievers;

private EJBTimerRetriever<Void> defaultTimerRetriever;

@Resource
protected javax.ejb.TimerService timerService;

Expand All @@ -89,6 +100,8 @@ public void setUseLocalCache(boolean useLocalCache) {
public void setup() {
// disable auto init of timers since ejb timer service supports persistence of timers
System.setProperty("org.jbpm.rm.init.timer", "false");
timerRetrievers = Arrays.asList(new WildflyEJBTimerRetriever(timerService));
defaultTimerRetriever = new DefaultEJBTimerRetriever(timerService);
logger.info("Using local cache for EJB timers: {}", useLocalCache);
}

Expand Down Expand Up @@ -353,41 +366,38 @@ public boolean removeJob(JobHandle jobHandle, Timer ejbTimer) {
}


public TimerJobInstance getTimerByName(TimerService service, String jobName) {
return useLocalCache ? localCache.computeIfAbsent(jobName, k -> queryTimerByName(service, k)) : queryTimerByName(service, jobName);
}

public TimerJobInstance getTimerByName(String jobName) {
if (useLocalCache) {
if (localCache.containsKey(jobName)) {
logger.debug("Found job {} in cache returning", jobName);
return localCache.get(jobName);
}
}
TimerJobInstance found = null;

for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;

EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();

if (handle.getUuid().equals(jobName)) {
found = handle.getTimerJobInstance();
if (useLocalCache) {
localCache.putIfAbsent(jobName, found);
}
logger.debug("Job {} does match timer and is going to be returned {}", jobName, found);

break;
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer info for {} was not found ", timer);
@SuppressWarnings({"rawtypes", "unchecked"})
private Collection<Object> getTimers(TimerService service, String jobName) {
if (service instanceof GlobalTimerService) {
for (EJBTimerRetriever retriever : timerRetrievers) {
Optional<?> accepted = retriever.accept((GlobalTimerService) service);
if (accepted.isPresent()) {
return retriever.getTimers(jobName, accepted.get());
}
}
}
}
return defaultTimerRetriever.getTimers(jobName, null);
}

return found;
}
private TimerJobInstance queryTimerByName(TimerService service, String jobName) {
for (Object timer : getTimers(service, jobName)) {
try {
if (timer instanceof EjbTimerJob) {
EjbGlobalJobHandle handle = (EjbGlobalJobHandle) ((EjbTimerJob) timer).getTimerJobInstance().getJobHandle();
if (handle.getUuid().equals(jobName)) {
return handle.getTimerJobInstance();
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer info for {} was not found {}", timer, e.getMessage());
}
}
return null;
}

public void evictCache(JobHandle jobHandle) {
String jobName = ((EjbGlobalJobHandle) jobHandle).getUuid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
if (!ctx.isNew()) {
jobInstance = getTimerJobInstance(jobName);
if (jobInstance == null) {
jobInstance = scheduler.getTimerByName(jobName);
jobInstance = scheduler.getTimerByName(globalTimerService, jobName);
}
if (jobInstance != null) {
return jobInstance.getJobHandle();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.services.ejb.timer;

import javax.persistence.EntityManagerFactory;

class WildflyAcceptedInfo {
private final EJBTimerDB db;
private final EntityManagerFactory emf;

public WildflyAcceptedInfo(EJBTimerDB db, EntityManagerFactory emf) {
this.db = db;
this.emf = emf;
}
public EntityManagerFactory getEmf() {
return emf;
}

public EJBTimerDB getDb() {
return db;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.services.ejb.timer;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.ejb.TimerService;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WildflyEJBTimerRetriever extends EJBTimerRetriever<WildflyAcceptedInfo> {

private final static Logger logger = LoggerFactory.getLogger(WildflyEJBTimerRetriever.class);
private final boolean isPersistentWildfly;
private Object persistence;


protected WildflyEJBTimerRetriever(TimerService timerService) {
super(timerService);
isPersistentWildfly = isPersistentWildfly();
if (isPersistentWildfly) {
logger.info ("EJBTimer is using Wildfly persistence");
}
}

private boolean isPersistentWildfly() {
Class<? extends TimerService> clazz = timerService.getClass();
if (clazz.getName().equals("org.jboss.as.ejb3.timerservice.TimerServiceImpl")) {
try {
Field field = clazz.getDeclaredField("persistence");
field.setAccessible(true);
persistence = field.get(timerService);
return persistence != null && persistence.getClass().getName().equals("org.jboss.as.ejb3.timerservice.persistence.database.DatabaseTimerPersistence");
} catch (ReflectiveOperationException ex) {
logger.trace("Exception retrieving timer service persistence field", ex);
}
}
return false;
}


@Override
public Optional<WildflyAcceptedInfo> accept(GlobalTimerService globalTimerService) {
return isPersistentWildfly ? getDB(globalTimerService.getRuntimeManager()) : Optional.empty();
}

private Optional<WildflyAcceptedInfo> getDB(InternalRuntimeManager runtime) {
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(runtime.getDeploymentDescriptor().getPersistenceUnit());
for (Object value : emf.getProperties().values()) {
if (value instanceof String) {
String search = value.toString().toLowerCase();
if (search.contains("oracle")) {
return Optional.of(new WildflyAcceptedInfo(EJBTimerDB.ORACLE, emf));
} else if (search.contains("postgresql")) {
return Optional.of(new WildflyAcceptedInfo(EJBTimerDB.POSTGRESQL,emf));
}
}
}
return Optional.empty();
}

@Override
public Collection<Object> getTimers(String jobName, WildflyAcceptedInfo accepted) {
EntityManager em = accepted.getEmf().createEntityManager();
List<String> results = em.createNativeQuery(accepted.getDb().getQuery(), String.class).setParameter(1, jobName).getResultList();
return results.stream().map(this::deserialize).collect(Collectors.toList());
}

private Object deserialize( String info) {
try {
Method method = persistence.getClass().getMethod("deSerialize", String.class);
return method.invoke(persistence, info);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("Error deserializing info", e);
}
}

}
Loading

0 comments on commit b76d4d3

Please sign in to comment.