Skip to content

Commit

Permalink
增加async processor支持。IBatchConsumer的接口参数修改为Collection类型
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Dec 11, 2024
1 parent 932e440 commit f22bb14
Show file tree
Hide file tree
Showing 41 changed files with 611 additions and 122 deletions.
290 changes: 268 additions & 22 deletions docs/theory/why-springbatch-is-bad.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.nop.batch.core;

import io.nop.api.core.annotations.core.Description;
import io.nop.api.core.config.IConfigReference;
import io.nop.api.core.util.SourceLocation;

import java.time.Duration;

import static io.nop.api.core.config.AppConfig.varRef;

public interface BatchConfigs {
SourceLocation s_loc = SourceLocation.fromClass(BatchConfigs.class);

@Description("异步Processor执行的缺省超时时间")
IConfigReference<Duration> CFG_BATCH_ASYNC_PROCESS_TIMEOUT = varRef(s_loc, "nop.batch.async-process-timeout",
Duration.class, Duration.ofMinutes(10)); // 缺省为10分钟
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface BatchConstants {

String DEFAULT_METER_PREFIX = "nop.";

String SNAPSHOT_BUILDER_ORM_ENTITY = "orm-entity";

String METER_TASK = "batch.task";
String METER_CHUNK = "batch.chunk";
String METER_LOAD = "batch.load";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import io.nop.batch.core.consumer.RetryBatchConsumer;
import io.nop.batch.core.consumer.SkipBatchConsumer;
import io.nop.batch.core.consumer.WithHistoryBatchConsumer;
import io.nop.batch.core.impl.BatchTaskExecution;
import io.nop.batch.core.impl.BatchTask;
import io.nop.batch.core.loader.ChunkSortBatchLoader;
import io.nop.batch.core.loader.InvokerBatchLoader;
import io.nop.batch.core.loader.PartitionDispatchLoaderProvider;
import io.nop.batch.core.loader.RetryBatchLoader;
import io.nop.batch.core.processor.BatchChunkProcessor;
import io.nop.batch.core.processor.BatchSequentialProcessor;
import io.nop.batch.core.processor.InvokerBatchChunkProcessor;
import io.nop.commons.concurrent.executor.ExecutorHelper;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.concurrent.ratelimit.DefaultRateLimiter;
import io.nop.commons.functional.IFunctionInvoker;
Expand All @@ -50,6 +49,8 @@ public class BatchTaskBuilder<S, R> implements IBatchTaskBuilder {
private IBatchConsumerProvider<R> consumer;
private boolean useBatchRequestGenerator;
private IBatchProcessorProvider<S, R> processor;
private boolean asyncProcessor;
private long asyncProcessTimeout;
private int batchSize = 100;

/**
Expand All @@ -69,7 +70,7 @@ public class BatchTaskBuilder<S, R> implements IBatchTaskBuilder {

private boolean singleSession;
private BatchTransactionScope batchTransactionScope = BatchTransactionScope.consume;
private Executor executor = ExecutorHelper.syncExecutor();
private Executor executor;

private IBatchStateStore stateStore;
private Boolean allowStartIfComplete;
Expand Down Expand Up @@ -156,6 +157,18 @@ public BatchTaskBuilder<S, R> taskKeyExpr(IEvalFunction expr) {
return this;
}

@PropertySetter
public BatchTaskBuilder<S, R> asyncProcessor(boolean asyncProcessor) {
this.asyncProcessor = asyncProcessor;
return this;
}

@PropertySetter
public BatchTaskBuilder<S, R> asyncProcessTimeout(long asyncProcessTimeout) {
this.asyncProcessTimeout = asyncProcessTimeout;
return this;
}

public void addTaskInitializer(Consumer<IBatchTaskContext> initializer) {
if (taskInitializers == null)
taskInitializers = new ArrayList<>();
Expand Down Expand Up @@ -310,7 +323,16 @@ public IBatchTask buildTask(IBatchTaskContext context) {
if (context.getStartLimit() <= 0)
context.setStartLimit(startLimit);

return new BatchTaskExecution<S>(taskName, taskVersion == null ? 0 : taskVersion, taskKeyExpr,
Executor executor = this.executor;
if (executor == null) {
if (concurrency > 0) {
executor = GlobalExecutors.cachedThreadPool();
} else {
executor = GlobalExecutors.syncExecutor();
}
}

return new BatchTask<S>(taskName, taskVersion == null ? 0 : taskVersion, taskKeyExpr,
executor, concurrency, taskInitializers, this::buildLoader, this::buildChunkProcessor, stateStore);
}

Expand Down Expand Up @@ -367,7 +389,7 @@ protected IBatchChunkProcessorProvider.IBatchChunkProcessor<S> buildChunkProcess
if (useBatchRequestGenerator) {
processor = new BatchSequentialProcessor(processor);
}
consumer = new BatchProcessorConsumer<>(processor, (IBatchConsumer<R>) consumer);
consumer = new BatchProcessorConsumer<>(processor, (IBatchConsumer<R>) consumer, asyncProcessor, asyncProcessTimeout);
}

// 保存处理历史,避免重复处理
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

/**
* 批处理的一个执行单元。例如100条记录组成一个chunk,一个chunk全部执行完毕之后才提交一次,而不是每处理一条记录就提交一次事务。
Expand Down Expand Up @@ -86,4 +87,10 @@ default boolean isRetrying() {
boolean isSingleMode();

void setSingleMode(boolean singleMode);

void initChunkLatch(CountDownLatch latch);

CountDownLatch getChunkLatch();

void countDown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.nop.batch.core.consumer.FilteredBatchConsumer;

import java.util.List;
import java.util.Collection;

public interface IBatchConsumerProvider<R> {

Expand All @@ -25,6 +25,6 @@ interface IBatchConsumer<R> {
* @param items 待处理的对象集合
* @param context 上下文对象
*/
void consume(List<R> items, IBatchChunkContext context);
void consume(Collection<R> items, IBatchChunkContext context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package io.nop.batch.core;

import java.util.List;
import java.util.Collection;

/**
* 用于记录已处理过的记录,避免重复处理。一般和处理函数在一个事务中,确保成功处理时一定会保存处理记录
Expand All @@ -19,7 +19,7 @@ public interface IBatchRecordHistoryStore<S> {
* @param records 待处理的记录列表
* @param context 任务上下文
*/
List<S> filterProcessed(List<S> records, IBatchChunkContext context);
Collection<S> filterProcessed(Collection<S> records, IBatchChunkContext context);

void saveProcessed(List<S> filtered, Throwable exception, IBatchChunkContext context);
void saveProcessed(Collection<S> filtered, Throwable exception, IBatchChunkContext context);
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.nop.batch.core;

import java.util.List;
import java.util.Collection;

public interface IBatchRecordSnapshotBuilder<S> {
interface ISnapshot<S> {
List<S> restore(List<S> items, IBatchChunkContext chunkContext);
Collection<S> restore(Collection<S> items, IBatchChunkContext chunkContext);

void onError(Throwable e);
}

ISnapshot<S> buildSnapshot(List<S> items, IBatchChunkContext chunkContext);
ISnapshot<S> buildSnapshot(Collection<S> items, IBatchChunkContext chunkContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.nop.core.context.IServiceContext;
import io.nop.core.utils.IVarSet;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -84,9 +84,9 @@ default void setPersistVar(String name, Object value) {
/**
* 本次任务处理所涉及到的数据分区
*/
IntRangeBean getPartition();
IntRangeBean getPartitionRange();

void setPartition(IntRangeBean partition);
void setPartitionRange(IntRangeBean partitionRange);

boolean isRecoverMode();

Expand Down Expand Up @@ -145,15 +145,15 @@ default void setPersistVar(String name, Object value) {

void onChunkEnd(BiConsumer<IBatchChunkContext, Throwable> action);

void onChunkTryBegin(BiConsumer<List<?>, IBatchChunkContext> action);
void onChunkTryBegin(BiConsumer<Collection<?>, IBatchChunkContext> action);

void onChunkTryEnd(BiConsumer<IBatchChunkContext, Throwable> action);

void onLoadBegin(BiConsumer<Integer, IBatchChunkContext> action);

void onLoadEnd(BiConsumer<IBatchChunkContext, Throwable> action);

void onConsumeBegin(BiConsumer<List<?>, IBatchChunkContext> action);
void onConsumeBegin(BiConsumer<Collection<?>, IBatchChunkContext> action);

void onConsumeEnd(BiConsumer<IBatchChunkContext, Throwable> action);

Expand All @@ -170,11 +170,11 @@ default void setPersistVar(String name, Object value) {

void fireChunkEnd(IBatchChunkContext chunkContext, Throwable err);

void fireChunkTryBegin(List<?> items, IBatchChunkContext chunkContext);
void fireChunkTryBegin(Collection<?> items, IBatchChunkContext chunkContext);

void fireChunkTryEnd(IBatchChunkContext chunkContext, Throwable err);

void fireConsumeBegin(List<?> items, IBatchChunkContext chunkContext);
void fireConsumeBegin(Collection<?> items, IBatchChunkContext chunkContext);

void fireConsumeEnd(IBatchChunkContext chunkContext, Throwable err);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
*/
package io.nop.batch.core.consumer;

import io.nop.api.core.exceptions.NopException;
import io.nop.api.core.exceptions.NopTimeoutException;
import io.nop.api.core.util.ICancellable;
import io.nop.batch.core.IBatchChunkContext;
import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer;
import io.nop.batch.core.IBatchProcessorProvider.IBatchProcessor;
import io.nop.batch.core.IBatchTaskMetrics;
import io.nop.batch.core.exceptions.BatchCancelException;

import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.nop.batch.core.BatchConfigs.CFG_BATCH_ASYNC_PROCESS_TIMEOUT;
import static io.nop.batch.core.BatchErrors.ERR_BATCH_CANCEL_PROCESS;

/**
Expand All @@ -27,55 +34,94 @@
public class BatchProcessorConsumer<S, R> implements IBatchConsumer<S> {
private final IBatchProcessor<S, R> processor;
private final IBatchConsumer<R> consumer;
private final boolean async;
private final long asyncProcessTimeout;

public BatchProcessorConsumer(IBatchProcessor<S, R> processor,
IBatchConsumer<R> consumer) {
IBatchConsumer<R> consumer, boolean async, long asyncProcessTimeout) {
this.processor = processor;
this.consumer = consumer;
this.async = async;
this.asyncProcessTimeout = asyncProcessTimeout;
}

public BatchProcessorConsumer(IBatchProcessor<S, R> processor, IBatchConsumer<R> consumer) {
this(processor, consumer, false, 0L);
}

@Override
public void consume(List<S> items, IBatchChunkContext context) {
public void consume(Collection<S> items, IBatchChunkContext batchChunkCtx) {

IBatchTaskMetrics metrics = context.getTaskContext().getMetrics();
IBatchTaskMetrics metrics = batchChunkCtx.getTaskContext().getMetrics();

if (async) {
batchChunkCtx.initChunkLatch(new CountDownLatch(items.size()));
}

// 假定为同步处理模型。这里缓存所有输出数据,至于当整个列表中的元素都被成功消费以后,才会处理输出数据
List<R> collector = new ArrayList<>();
Collection<R> outputs = newOutputs();

for (S item : items) {
Object meter = metrics == null ? null : metrics.beginProcess();
boolean success = false;
try {
context.incProcessCount();
context.getTaskContext().incProcessItemCount(1);
processor.process(item, collector::add, context);
batchChunkCtx.incProcessCount();
batchChunkCtx.getTaskContext().incProcessItemCount(1);
// processor内部可能异步执行。如果是异步执行,执行完毕后需要调用batchChunkCtx.countDown()
processor.process(item, outputs::add, batchChunkCtx);
success = true;
} finally {
if (metrics != null)
metrics.endProcess(meter, success);
}

if (context.getTaskContext().isCancelled())
if (batchChunkCtx.getTaskContext().isCancelled())
throw new BatchCancelException(ERR_BATCH_CANCEL_PROCESS);

if (context.isCancelled())
if (batchChunkCtx.isCancelled())
throw new BatchCancelException(ERR_BATCH_CANCEL_PROCESS);
}

consumeResult(collector, context);
if (async) {
try {
long timeout = asyncProcessTimeout;
if (timeout <= 0)
timeout = CFG_BATCH_ASYNC_PROCESS_TIMEOUT.get().toMillis();

if (!batchChunkCtx.getChunkLatch().await(timeout, TimeUnit.MILLISECONDS)) {
batchChunkCtx.cancel(ICancellable.CANCEL_REASON_TIMEOUT);
throw new NopTimeoutException();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw NopException.adapt(e);
}
}

// 即使是空集合也执行一次consumer,有可能会触发orm flush
consumeResult(outputs, batchChunkCtx);
}

protected Collection<R> newOutputs() {
return async ? new ConcurrentLinkedQueue<>() : new ArrayList<>();
}

void consumeResult(List<R> collector, IBatchChunkContext context) {
void consumeResult(Collection<R> outputs, IBatchChunkContext context) {
IBatchTaskMetrics metrics = context.getTaskContext().getMetrics();
Object meter = metrics == null ? null : metrics.beginConsume(collector.size());
Object meter = metrics == null ? null : metrics.beginConsume(outputs.size());

boolean success = false;
try {
consumer.consume(collector, context);
context.getTaskContext().fireConsumeBegin(outputs, context);
consumer.consume(outputs, context);
context.getTaskContext().fireConsumeEnd(context, null);
success = true;
} catch (Exception e) {
context.getTaskContext().fireConsumeEnd(context, e);
throw NopException.adapt(e);
} finally {
if (metrics != null) {
metrics.endConsume(meter, collector.size(), success);
metrics.endConsume(meter, outputs.size(), success);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.nop.batch.core.IBatchTaskContext;
import io.nop.commons.concurrent.IBlockingSink;

import java.util.List;
import java.util.Collection;

public class BlockingSinkBatchConsumer<R> implements IBatchConsumer<R>, IBatchConsumerProvider<R> {
private IBlockingSink<R> sink;
Expand All @@ -33,7 +33,7 @@ public IBatchConsumer<R> setup(IBatchTaskContext context) {
}

@Override
public void consume(List<R> items, IBatchChunkContext context) {
public void consume(Collection<R> items, IBatchChunkContext context) {
try {
sink.sendMulti(items);
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit f22bb14

Please sign in to comment.