Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBPM-10209] Switching to CMT #2362

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,18 @@

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.NoSuchObjectLocalException;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.RollbackException;
import javax.transaction.UserTransaction;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.drools.core.time.JobHandle;
import org.drools.core.time.impl.TimerJobInstance;
Expand All @@ -58,8 +55,6 @@

@Singleton
@Startup
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
@TransactionManagement(TransactionManagementType.BEAN)
@Lock(LockType.READ)
public class EJBTimerScheduler {

Expand All @@ -70,17 +65,19 @@ public class EJBTimerScheduler {
private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3"));

private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000"));

private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200"));
fjtirado marked this conversation as resolved.
Show resolved Hide resolved

private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));

private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;

@Resource
protected UserTransaction utx;

protected SessionContext ctx;
public void setUseLocalCache(boolean useLocalCache) {
this.useLocalCache = useLocalCache;
}
Expand All @@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) {
EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo();
TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance();
logger.debug("About to execute timer for job {}", timerJob);

String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId();

// handle overdue timers as ejb timer service might start before all deployments are ready
long time = 0;
while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
time += 500;

if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
try {
while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
Thread.sleep(OVERDUE_CHECK_TIME);
time += OVERDUE_CHECK_TIME;
if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
}
}
}
} catch (InterruptedException e) {
logger.warn("Thread has been interrupted", e);
Thread.currentThread().interrupt();
}
try {
transaction(this::executeTimerJobInstance, timerJobInstance);
} catch (SessionNotFoundException e) {
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
removeUnrecoverableTimer(timerJob, timer);
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
}

private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
try {
((Callable<?>) timerJobInstance).call();
} catch (Exception e) {
throw e;
}
((Callable<?>) timerJobInstance).call();
}

private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
try {
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) {
if (isSessionNotFound(e)) {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e);
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
Transaction<TimerJobInstance> tx;
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
if (isSessionNotFound(cause)) {
// if session is not found means the process has already finished. In this case we just need to remove
// the timer and avoid any recovery as it should not trigger any more timers.
removeUnrecoverableTimer(ejbTimerJob, timer);
return;
tx = timerJobInstance -> {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
}
};
}

if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
// because of the transaction, so we need to do this here.
try {

logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (this.removeJob(timerJobInstance.getJobHandle(), null)) {
this.internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
}
return;
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
}
};
}
else {
// if there is not next date to be fired, we need to apply policy otherwise will be lost
tx = timerJobInstance -> {
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
};
}

// if there is not next date to be fired, we need to apply policy otherwise will be lost

logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> operation = (instance) -> {
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = null;
if(ejbTimerJob instanceof EjbTimerJobRetry) {
info = ((EjbTimerJobRetry) ejbTimerJob).next();
} else {
info = new EjbTimerJobRetry(instance);
}
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT);
return;
}
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
TimerHandle handler = newTimer.getHandle();
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler);
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
};
try {
transaction(operation, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e) {
logger.error("Failed to executed timer recovery", e);
}

}

private boolean isSessionNotFound(Exception e) {
Expand All @@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) {

@FunctionalInterface
private interface Transaction<I> {

void doWork(I item) throws Exception;
}

private <I> void transaction(Transaction<I> operation, I item) throws Exception {
try {
utx.begin();
operation.doWork(item);
utx.commit();
} catch(RollbackException e) {
logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus());
if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
utx.rollback();
}
throw new RuntimeException("jbpm timer has been rolledback", e);
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception re) {
logger.error("transaction could not be rolled back", re);
}

throw e;
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
}

public void internalSchedule(TimerJobInstance timerJobInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.drools.core.time.impl.TimerJobInstance;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerFactory;
import org.drools.persistence.api.TransactionSynchronization;
import org.drools.persistence.jta.JtaTransactionManager;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.JobNameHelper;
Expand Down Expand Up @@ -99,34 +98,12 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {

@Override
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(((EjbGlobalJobHandle) jobHandle).getUuid()));
if (TRANSACTIONAL && ejbTimer == null) {
// this situation needs to be avoided as it should not happen
return false;
}
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager();
try {
tm.registerTransactionSynchronization(new TransactionSynchronization() {
@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int status) {
if (status == TransactionManager.STATUS_COMMITTED) {
logger.debug("remove job {} after commited", jobHandle);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
scheduler.removeJob(jobHandle, ejbTimer);
}
}

});
logger.debug("register tx to remove job {}", jobHandle);
return true;
} catch (Exception e) {
logger.debug("remove job {} outside tx", jobHandle);
return scheduler.removeJob(jobHandle, ejbTimer);
}
return scheduler.removeJob(jobHandle, ejbTimer);
}

private TimerJobInstance getTimerJobInstance (String uuid) {
Expand Down
Loading