From f22bb14d4355957d89280c057a86c8909d918ce9 Mon Sep 17 00:00:00 2001 From: canonical Date: Wed, 11 Dec 2024 18:51:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0async=20processor=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E3=80=82IBatchConsumer=E7=9A=84=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E4=BF=AE=E6=94=B9=E4=B8=BACollection?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/theory/why-springbatch-is-bad.md | 290 ++++++++++++++++-- .../java/io/nop/batch/core/BatchConfigs.java | 17 + .../io/nop/batch/core/BatchConstants.java | 2 + .../io/nop/batch/core/BatchTaskBuilder.java | 32 +- .../io/nop/batch/core/IBatchChunkContext.java | 7 + .../batch/core/IBatchConsumerProvider.java | 4 +- .../batch/core/IBatchRecordHistoryStore.java | 6 +- .../core/IBatchRecordSnapshotBuilder.java | 6 +- .../io/nop/batch/core/IBatchTaskContext.java | 14 +- .../core/consumer/BatchProcessorConsumer.java | 76 ++++- .../consumer/BlockingSinkBatchConsumer.java | 4 +- .../core/consumer/EmptyBatchConsumer.java | 4 +- .../core/consumer/FilteredBatchConsumer.java | 3 +- .../core/consumer/InvokerBatchConsumer.java | 4 +- .../core/consumer/MultiBatchConsumer.java | 3 +- .../core/consumer/RateLimitConsumer.java | 4 +- .../ResourceRecordConsumerProvider.java | 3 +- .../core/consumer/RetryBatchConsumer.java | 15 +- .../core/consumer/SkipBatchConsumer.java | 4 +- .../core/consumer/SplitBatchConsumer.java | 3 +- .../consumer/WithHistoryBatchConsumer.java | 6 +- .../core/impl/BatchChunkContextImpl.java | 17 + ...BatchTaskExecution.java => BatchTask.java} | 14 +- .../batch/core/impl/BatchTaskContextImpl.java | 31 +- .../java/io/nop/batch/core/TestBatchTask.java | 3 +- .../ModelBasedBatchTaskBuilderFactory.java | 20 ++ .../batch/dsl/model/_gen/_BatchTaskModel.java | 84 +++++ .../resources/_vfs/nop/batch/xlib/batch.xlib | 8 + .../io/nop/batch/dsl/TestBatchTaskDsl.java | 2 +- .../consumer/JdbcInsertBatchConsumer.java | 4 +- .../jdbc/consumer/JdbcKeyDuplicateFilter.java | 9 +- .../consumer/JdbcUpdateBatchConsumer.java | 3 +- .../jdbc/loader/JdbcBatchLoaderProvider.java | 2 +- .../batch/orm/consumer/OrmBatchConsumer.java | 2 +- .../orm/consumer/OrmInsertBatchConsumer.java | 4 +- .../loader/OrmQueryBatchLoaderProvider.java | 2 +- .../OrmBatchRecordSnapshotBuilder.java | 5 +- nop-cli/demo/_vfs/batch/batch-demo.task.xml | 2 + .../concurrent/executor/GlobalExecutors.java | 4 + .../io/nop/excel/imp/ImportExcelParser.java | 4 +- .../resources/_vfs/nop/schema/task/batch.xdef | 6 + 41 files changed, 611 insertions(+), 122 deletions(-) create mode 100644 nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConfigs.java rename nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/{BatchTaskExecution.java => BatchTask.java} (94%) diff --git a/docs/theory/why-springbatch-is-bad.md b/docs/theory/why-springbatch-is-bad.md index e5f0c9424..62fcee333 100644 --- a/docs/theory/why-springbatch-is-bad.md +++ b/docs/theory/why-springbatch-is-bad.md @@ -133,7 +133,7 @@ class JdbcPagingItemReader implements ItemReader { } ``` -如果Reader读取的不是简单的平面结构记录,而是一个复杂的业务对象。那么如果要实现属性的批量加载,就必须修改Reader的实现代码,导致了代码的耦合性增加。 +这种设计不仅使得Reader必须持有临时状态变量,而且也使得批量优化难以在外部进行。如果Reader读取的不是简单的平面结构记录,而是一个复杂的业务对象,那么如果要实现属性的批量加载,就必须修改Reader的实现代码,导致了代码的耦合性增加。 NopBatch中使用IBatchLoader接口来实现批量加载,可以更好的支持批量读取优化。 @@ -157,17 +157,38 @@ public interface IBatchLoader { ```javascript List data = loader.load(batchSize, context); -batchLoadRelatedData(data, context); // 批量加载其他相关数据 + +// 批量加载其他相关数据,加载的数据可以放到context中,也可以作为data中元素的扩展字段 +batchLoadRelatedData(data, context); ``` 当处理数据需要获取互斥锁的时候,SpringBatch的设计也显得非常不友好。因为SpringBatch的ItemReader是逐条读取的,导致获取锁的时候无法进行批量优化,并且获取锁的顺序也难以控制,存在死锁风险。 而NopBatch的设计可以先按照某种规则对记录进行排序(不要求reader读取时整体排序),然后一次性获取所有需要的锁,这样就可以避免死锁风险。 +总而言之,SpringBatch的设计体现出Item Oriented的遗迹,导致Chunk级别的处理很不自然。 + +> Chunk的概念是在Spring2.0中引入的,最早的时候SpringBatch只有Item的概念。 + +SpringBatch 1.0中的ItemWriter接口定义如下: + +```java +public interface ItemWriter { + + public void write(Object item) throws Exception; + + public void flush() throws FlushFailedException; + + public void clear() throws ClearFailedException; +} +``` + ### 2.2 Processor的每次调用不应该只返回一条记录 -SpringBatch的处理逻辑类似于函数式编程中的map函数,`data.map(a->b)`,对每一条输入记录进行处理,返回一个输出记录。 这里很自然的就会产生一个疑问,为什么一次处理最多只会产生一个输出?不能一次处理产生多个输出吗? +SpringBatch中Processor的处理逻辑类似于函数式编程中的map函数,`data.map(a->b)`,对每一条输入记录进行处理,返回一个输出记录。 这里很自然的就会产生一个疑问,为什么一次处理最多只会产生一个输出?不能一次处理产生多个输出吗? -现代的流处理框架的语义更接近于函数式编程中的flatMap函数, `data.flatMap(a->[b])`。也就是说,一次处理,可以有三种结果:A. 没有输出 B. 产生一个输出 C. 产生多个输出。如果一次处理可以产生多个输出,那么能不能每产生一个输出就交给下游进行处理,不用等待当前所有输出都产生之后再传递到下游? +现代的流处理框架的语义更接近于函数式编程中的flatMap函数, `data.flatMap(a->[b])`。也就是说,一次处理,可以有三种结果:A. 没有输出 B. 产生一个输出 C. 产生多个输出。 + +流式处理模式:如果一次处理可以产生多个输出,那么能不能每产生一个输出就交给下游进行处理,不用等待当前所有输出都产生之后再传递到下游? NopBatch仿照流处理框架,定义了如下处理接口 @@ -199,9 +220,9 @@ public interface IBatchProcessor { * IBatchProcessor接口还提供了一个then函数,可以将两个IBatchProcessor组合为一个整体的Processor,形成一种链式调用。这其实是类似函数式编程中Monad概念的一种应用。 -### 2.3 Writer命名不合适 +### 2.3 Writer接收Collection类型数据即可 -SpringBatch中的ItemWriter从命名上看是用于消费Processor产生的结果数据,这样就导致在概念层面上固化了Read-Process-Write的处理流程。但有很多情况下我们并不需要写出结果,只需要消费输入的数据而已。 +首先SpringBatch中ItemWriter的命名不太合适。ItemWriter从命名上看是用于消费Processor产生的结果数据,这样就导致在概念层面上固化了Read-Process-Write的处理流程。但有很多情况下我们并不需要写出结果,只需要消费输入的数据而已。 NopBatch引入了通用的BatchConsumer概念,使得BatchConsumer和BatchLoader构成一对对偶的接口,BatchLoader加载的数据直接传递给BatchConsumer进行消费。 @@ -211,16 +232,16 @@ public interface IBatchConsumer { * @param items 待处理的对象集合 * @param context 上下文对象 */ - void consume(List items, IBatchChunkContext context); + void consume(Collection items, IBatchChunkContext context); } ``` Chunk的处理流程变得非常简单 -``` +```javascript List items = loader.load(batchSize,context); if(items == null || items.isEmpty()) - return STOP; + return ProcessingResult.STOP; consumer.consume(items,context); ``` @@ -230,16 +251,41 @@ Processor可以看作是一种可选的Consumer实现方案 public class BatchProcessorConsumer implements IBatchConsumer { @Override - public void consume(List items, IBatchChunkContext context) { - List collector = new ArrayList<>(); + public void consume(Collection items, IBatchChunkContext context) { + List outputs = new ArrayList<>(); for(S item: items){ - processor.process(item, collector::add, context); + processor.process(item, outputs::add, context); } - consumer.consume(collector, context); + consumer.consume(outputs, context); } } ``` +SpringBatch中的Chunk结构定义如下: + +```java +class Chunk implements Iterable, Serializable { + + private List items = new ArrayList<>(); + + private List> skips = new ArrayList<>(); + + private final List errors = new ArrayList<>(); + + private Object userData; + + private boolean end; + + private boolean busy; +} +``` + +Chunk结构中包含多种信息,但是在Processor和Reader中却不能直接访问Chunk结构,造成不必要的复杂性。 + +在NopBatch的架构中,Loader/Processor/Consumer接口都接受同样的IBatchChunkContext参数,通过它可以实现相互协调。同时在IBatchConsumer接口中,items以Collection类型传递即可,没有必要强制要求使用List类型。 + +> 当异步执行Processor的情况下,items中会使用ConcurrentLinkedQueue来保存。 + ### 2.4 事务处理机制不灵活 SpringBatch强制限定了一个Chunk的Read-Process-Write在一个事务中执行。但是在Nop平台中,业务实体一般都具有乐观锁版本字段,而且在OrmSession中会缓存所有实体对象,这使得我们可以选择仅在Write阶段打开事务,从而缩小事务影响范围,减少数据库连接池的占用时间。 @@ -420,7 +466,7 @@ class MyComponent extends Vue { 核心的设计思想都是在组件上实现生命周期监听函数,框架在创建这些组件的时候注册对应的事件监听器,然后利用组件对象的成员变量来实现多个回调函数之间的信息传递和组织。 -前端领域后来出现了一个革命性的进展,就是引入了所谓的Hooks机制,抛弃了Class Based的组件方案。参见[从React Hooks看React的本质](https://mp.weixin.qq.com/s/-n5On67e3_46zH6ppPlkTA) +前端领域后来出现了一个革命性的进展,就是引入了所谓的Hooks机制,抛弃了Class Based的组件方案。参见我的公众号文章[从React Hooks看React的本质](https://mp.weixin.qq.com/s/-n5On67e3_46zH6ppPlkTA) 在Hooks方案下,前端组件退化为一个响应式的render函数,考虑到一次性的初始化过程,Vue选择将组件抽象为render函数的构造器。 @@ -454,6 +500,8 @@ Hooks方案相比于传统的类组件方案有如下优点: 2. 多个事件监听函数之间可以通过闭包传递信息,而不需要再通过this指针迂回。 +3. 可以根据传入的参数动态决定是否注册事件监听器。 + 这里的关键性的架构变化是提供了一种全局的、动态事件注册机制,而不是将事件监听函数与某个对象指针绑定,必须是某个对象的成员函数。 类似于Hooks方案,NopBatch将核心抽象从IBatchLoader这种运行组件变更为IBatchLoaderProvider这种工厂组件,它提供一个setup方法来创建IBatchLoader。 @@ -641,7 +689,7 @@ SpringBatch的关键特性描述中强调了可重用性和可扩展性,但是 SpringBatch的job配置可以看作是一种非常简易且不通用的逻辑流编排机制,它只能编排批处理任务,不能作为一个通用的逻辑流编排引擎来使用。在NopBatch框架中我们明确将逻辑流编排从批处理引擎中剥离出来,使用通用的NopTaskFlow来编排逻辑,而NopBatch只负责一个流程步骤中的Chunk处理。这使得NopTaskFlow和NopBatch的设计都变得非常简单直接,它们的实现代码远比SpringBatch要简单(只有几千行代码),且具有非常强大的扩展能力。在NopTaskFlow和NopBatch中做的工作都可以应用到更加通用的场景中。 -NopTaskFlow是根据可逆计算原理从零开始构建的下一代逻辑流编排框架,它的核心抽象是支持Decorator和状态持久化的RichFunction。它的性能很高并且非常轻量级,可以用在所有需要进行函数配置化分解的地方。详细介绍参见[从零开始编写的下一代逻辑编排引擎 NopTaskFlow](https://mp.weixin.qq.com/s/2mFC0nQon_l2M82tOlJVhg) +NopTaskFlow是根据可逆计算原理从零开始构建的下一代逻辑流编排框架,它的核心抽象是支持Decorator和状态持久化的RichFunction。它的性能很高并且非常轻量级(核心只有3000行左右代码),可以用在所有需要进行函数配置化分解的地方。详细介绍参见[从零开始编写的下一代逻辑编排引擎 NopTaskFlow](https://mp.weixin.qq.com/s/2mFC0nQon_l2M82tOlJVhg) 在NopTaskFlow中实现与上面SpringBatch Job等价的配置 @@ -780,7 +828,7 @@ const {RESULT} = step1(x+1,y+2) ### 3.3 支持工作共享的分区并行处理 -SpringBatch提供将数据拆分成多个分区,并分配给多个从属步骤(slave steps)来实现并行处理的机制。以下是分区并行处理的主要步骤和组件: +SpringBatch提供了将数据拆分成多个分区,并分配给多个从属步骤(slave steps)来实现并行处理的机制。以下是分区并行处理的主要步骤和组件: 1. **定义分区器(Partitioner)**: @@ -803,7 +851,8 @@ SpringBatch提供将数据拆分成多个分区,并分配给多个从属步骤 ```xml - + + @@ -885,6 +934,31 @@ NopBatch内置了一个PartitionDispatchLoaderProvider,它提供了一种灵 每个处理线程去加载chunk数据的时候,可以从PartitionDispatchQueue中的微队列中获取数据,每次获取到数据后就标记对应的微队列已经被使用,阻止其他线程去处理同样的微队列。当chunk处理完毕之后,会在onChunkEnd回调函数中释放对应的微队列。 +```xml + + + + + + + + + + + + for(let item of items){ + item.make_t().partitionIndex = ...; // 动态计算得到partitionIndex + } + + + +``` + +> Nop平台中的所有实体都提供了make_t()函数,它返回一个Map,可以用于保存自定义临时属性。这一设计也符合可逆计算每个局部都具有扩展能力的设计理念。 + +上面是NopBatch DSL的一个配置片段,它采用OrmReader读取DemoIncomingTxn表中的数据,然后按照实体上`_t.partitionIndex`的配置投递到不同的队列。 + + 在SpringBatch中每个线程对应一个分区,分区的个数等于线程的个数。而在NopBatch中实际分区的个数最大为32768,它远大于批处理任务的并行线程数,同时又远小于实际业务实体数,可以保证分区比较均衡同时又不需要在内存中维护太多的队列。 如果确实需要类似SpringBatch的步骤级别的并行处理能力,可以直接使用NopTaskFlow中的fork或者fork-n步骤配置。 @@ -908,13 +982,39 @@ NopBatch内置了一个PartitionDispatchLoaderProvider,它提供了一种灵 fork步骤的producer可以动态计算得到一个列表,然后针对其中的每个元素会启动一个单独的步骤实例。 +NopBatch DSL中的OrmReader和JdbcReader都支持partitionIndexField配置,如果指定了这个分区字段,且传入partitionRange参数,则会自动生成分区过滤条件。 + +```xml + + + + + + + + + +``` + +```javascript +batchTaskContext.setPartitionRange(IntRangeBean.of(1000,100)); +``` + +上面的配置在执行时会生成如下SQL语句 + +```sql +select o from MyEntity o +where o.status = '1' +and o.partitionIndex between 1000 and (1000 + 100 - 1) +``` + ## 三. DSL森林: NopTaskFlow + NopBatch + NopRecord + NopORM SpringBatch虽然号称是声明式开发,但是它的声明式是利用Spring IoC有限的Bean组装描述,大量的业务相关内容仍然是需要写在Java代码中,并没有建立一个完整的能够实现细粒度的声明式开发的批处理模型。另外一方面,如果SpringBatch真的提出一个专用于批处理的领域特定模型,似乎又难以保证它的可扩展性,有可能会限制它的应用范围。 NopBatch所提供的解决方案是一个非常具有Nop平台特色的解决方案,也就是所谓的DSL森林:通过复用一组无缝嵌套在一起的、适用于不同局部领域的DSL来解决问题,而不是依靠一个单一的、大而全的、专门针对批处理设计的DSL。针对批处理,我们只建立一个最小化的NopBatch批处理模型,它负责抽象Batch领域特定的Chunk处理逻辑,并提供一系列的辅助实现类,比如PartitionDispatcherQueue。在更宏观的任务编排层面上,我们复用已有的NopTaskFlow来实现。NopTaskFlow完全不具备批处理相关的知识,也不需要为了与NopBatch集成在一起在引擎内部做任何适应性改造,而是通过元编程抹平两者之间融合所产生的一切沟沟坎坎。 -在文件解析层面,SpringBatch提供了一个FlatFileItemReader,通过它可以进行一系列的配置来实现对简单结构的数据文件实现解析。 +举例来说,在文件解析层面,SpringBatch提供了一个FlatFileItemReader,通过它可以进行一系列的配置来实现对简单结构的数据文件实现解析。 ```xml @@ -960,11 +1060,157 @@ NopBatch所提供的解决方案是一个非常具有Nop平台特色的解决方 在Nop平台中,我们定义了一种专用于数据消息格式解析和生成的Record模型,但它并不是为批处理文件解析专门设计,而是可以用于所有需要消息解析和生成的地方,是一种通用的声明式开发机制,而且能力远比SpringBatch中的FlatFile配置强大。 -在数据库存取方面,NopORM提供了完整的ORM模型支持,内置多租户、逻辑删除、字段加解密、柔性事务处理、数据关联查询、批量加载和批量保存优化等完善的数据访问层能力。 +```xml + + + + + + + + + + + + + + + + + + + + bizDate + + + + + + + + + consume(item); + + + + + + + + + + + + + + return item.quantity > 500; + + + + + + + + + +``` -结合NopTaskFlow、NopBatch、NopRecord和NopORM等多个领域模型,Nop平台就可以做到在一般业务开发时完全通过声明式的方式实现批处理任务,而不需要编写Java代码。 +在上面的示例中,演示了在NopTaskFlow中如何无缝嵌入Batch批处理模型和Record消息格式定义。 + +1. NopTaskFlow逻辑编排引擎在设计的时候并没有任何关于批处理任务的知识,也没有内置Record模型。 +2. 扩展NopTaskFlow并不需要实现某个NopTaskFlow引擎内部的扩展接口,也不需要使用NopTaskFlow内部的某种注册机制注册扩展步骤。 +3. 只需要查看`task.xdef`元模型,了解NopTaskFlow逻辑编排模型的节点结构,就可以使用XLang语言内置的元编程机制实现扩展。 +4. `x:extends="/nop/task/lib/common.task.xml,/nop/task/lib/batch-common.task.xml"`引入了基础模型支持,这些基础模型通过`x:post-extends`等元编程机制在XNode结构层对当前模型进行结构变换。 +5. `` 扩展节点的customType会被自动识别为Xpl 标签函数,并将custom节点变换为对``标签函数的调用。 ```xml - - + + ... + +会被自动变换为 + + + + + ... + + + +``` + +也就是说,customType是具有名字空间的标签函数名。所有具有相同名字空间的属性和子节点都会作为该标签的属性和子节点。 + +7. ``标签会在编译期解析自己的task节点,构造出IBatchTaskBuilder,在运行期可以直接获取到编译期变量,不用再重复解析。 +8. 所有的XDSL都自动支持扩展属性和扩展节点,缺省情况下带名字空间的属性和节点不会参与XDef元模型检查。所以在task节点下可以引入自定义的``模型定义,它会被`batch-common.task.xml`引入的元编程处理器自动解析为RecordFileMeta模型对象,并保存为编译期的一个变量。 +9. `file-reader`和 `file-writer`节点上的`record:file-model`属性会被识别,并自动转换。 + +```xml + + +被变换为 + + + + + + + +``` + +10. NopTaskFlow在某个步骤中调用BatchTask,在BatchTask的Processor中我们可以使用同样的方式来调用NopTaskFlow来实现针对单条记录的处理逻辑。 + +```xml + + + +会被变换为 + + + + + ``` + +11. 在数据库存取方面,NopORM提供了完整的ORM模型支持,内置多租户、逻辑删除、字段加解密、柔性事务处理、数据关联查询、批量加载和批量保存优化等完善的数据访问层能力。通过orm-reader和orm-writer可以实现数据库读写。 + +```xml + + + + + + + + + const data = { + txnTime: item.txnTime, + txnAmount: item.txnAmount, + txnType: item.txnType, + cardNumber: item.cardNumber + } + consume(data) + + + + + + + + +``` + + +**结合NopTaskFlow、NopBatch、NopRecord和NopORM等多个领域模型,Nop平台就可以做到在一般业务开发时完全通过声明式的方式实现批处理任务,而不需要编写Java代码**。 + + +## 四. DSL的多重表象 + diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConfigs.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConfigs.java new file mode 100644 index 000000000..4e810a77c --- /dev/null +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConfigs.java @@ -0,0 +1,17 @@ +package io.nop.batch.core; + +import io.nop.api.core.annotations.core.Description; +import io.nop.api.core.config.IConfigReference; +import io.nop.api.core.util.SourceLocation; + +import java.time.Duration; + +import static io.nop.api.core.config.AppConfig.varRef; + +public interface BatchConfigs { + SourceLocation s_loc = SourceLocation.fromClass(BatchConfigs.class); + + @Description("异步Processor执行的缺省超时时间") + IConfigReference CFG_BATCH_ASYNC_PROCESS_TIMEOUT = varRef(s_loc, "nop.batch.async-process-timeout", + Duration.class, Duration.ofMinutes(10)); // 缺省为10分钟 +} diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConstants.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConstants.java index 5006d2cc3..33701db99 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConstants.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchConstants.java @@ -16,6 +16,8 @@ public interface BatchConstants { String DEFAULT_METER_PREFIX = "nop."; + String SNAPSHOT_BUILDER_ORM_ENTITY = "orm-entity"; + String METER_TASK = "batch.task"; String METER_CHUNK = "batch.chunk"; String METER_LOAD = "batch.load"; diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchTaskBuilder.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchTaskBuilder.java index aaf0c82b5..8a08e4f72 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchTaskBuilder.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/BatchTaskBuilder.java @@ -19,7 +19,7 @@ import io.nop.batch.core.consumer.RetryBatchConsumer; import io.nop.batch.core.consumer.SkipBatchConsumer; import io.nop.batch.core.consumer.WithHistoryBatchConsumer; -import io.nop.batch.core.impl.BatchTaskExecution; +import io.nop.batch.core.impl.BatchTask; import io.nop.batch.core.loader.ChunkSortBatchLoader; import io.nop.batch.core.loader.InvokerBatchLoader; import io.nop.batch.core.loader.PartitionDispatchLoaderProvider; @@ -27,7 +27,6 @@ import io.nop.batch.core.processor.BatchChunkProcessor; import io.nop.batch.core.processor.BatchSequentialProcessor; import io.nop.batch.core.processor.InvokerBatchChunkProcessor; -import io.nop.commons.concurrent.executor.ExecutorHelper; import io.nop.commons.concurrent.executor.GlobalExecutors; import io.nop.commons.concurrent.ratelimit.DefaultRateLimiter; import io.nop.commons.functional.IFunctionInvoker; @@ -50,6 +49,8 @@ public class BatchTaskBuilder implements IBatchTaskBuilder { private IBatchConsumerProvider consumer; private boolean useBatchRequestGenerator; private IBatchProcessorProvider processor; + private boolean asyncProcessor; + private long asyncProcessTimeout; private int batchSize = 100; /** @@ -69,7 +70,7 @@ public class BatchTaskBuilder implements IBatchTaskBuilder { private boolean singleSession; private BatchTransactionScope batchTransactionScope = BatchTransactionScope.consume; - private Executor executor = ExecutorHelper.syncExecutor(); + private Executor executor; private IBatchStateStore stateStore; private Boolean allowStartIfComplete; @@ -156,6 +157,18 @@ public BatchTaskBuilder taskKeyExpr(IEvalFunction expr) { return this; } + @PropertySetter + public BatchTaskBuilder asyncProcessor(boolean asyncProcessor) { + this.asyncProcessor = asyncProcessor; + return this; + } + + @PropertySetter + public BatchTaskBuilder asyncProcessTimeout(long asyncProcessTimeout) { + this.asyncProcessTimeout = asyncProcessTimeout; + return this; + } + public void addTaskInitializer(Consumer initializer) { if (taskInitializers == null) taskInitializers = new ArrayList<>(); @@ -310,7 +323,16 @@ public IBatchTask buildTask(IBatchTaskContext context) { if (context.getStartLimit() <= 0) context.setStartLimit(startLimit); - return new BatchTaskExecution(taskName, taskVersion == null ? 0 : taskVersion, taskKeyExpr, + Executor executor = this.executor; + if (executor == null) { + if (concurrency > 0) { + executor = GlobalExecutors.cachedThreadPool(); + } else { + executor = GlobalExecutors.syncExecutor(); + } + } + + return new BatchTask(taskName, taskVersion == null ? 0 : taskVersion, taskKeyExpr, executor, concurrency, taskInitializers, this::buildLoader, this::buildChunkProcessor, stateStore); } @@ -367,7 +389,7 @@ protected IBatchChunkProcessorProvider.IBatchChunkProcessor buildChunkProcess if (useBatchRequestGenerator) { processor = new BatchSequentialProcessor(processor); } - consumer = new BatchProcessorConsumer<>(processor, (IBatchConsumer) consumer); + consumer = new BatchProcessorConsumer<>(processor, (IBatchConsumer) consumer, asyncProcessor, asyncProcessTimeout); } // 保存处理历史,避免重复处理 diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchChunkContext.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchChunkContext.java index 86b5c8304..9ca06f64a 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchChunkContext.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchChunkContext.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; /** * 批处理的一个执行单元。例如100条记录组成一个chunk,一个chunk全部执行完毕之后才提交一次,而不是每处理一条记录就提交一次事务。 @@ -86,4 +87,10 @@ default boolean isRetrying() { boolean isSingleMode(); void setSingleMode(boolean singleMode); + + void initChunkLatch(CountDownLatch latch); + + CountDownLatch getChunkLatch(); + + void countDown(); } \ No newline at end of file diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchConsumerProvider.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchConsumerProvider.java index 38c1cc0b9..69fe67f74 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchConsumerProvider.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchConsumerProvider.java @@ -2,7 +2,7 @@ import io.nop.batch.core.consumer.FilteredBatchConsumer; -import java.util.List; +import java.util.Collection; public interface IBatchConsumerProvider { @@ -25,6 +25,6 @@ interface IBatchConsumer { * @param items 待处理的对象集合 * @param context 上下文对象 */ - void consume(List items, IBatchChunkContext context); + void consume(Collection items, IBatchChunkContext context); } } diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordHistoryStore.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordHistoryStore.java index 1d21f29bd..9b8426954 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordHistoryStore.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordHistoryStore.java @@ -7,7 +7,7 @@ */ package io.nop.batch.core; -import java.util.List; +import java.util.Collection; /** * 用于记录已处理过的记录,避免重复处理。一般和处理函数在一个事务中,确保成功处理时一定会保存处理记录 @@ -19,7 +19,7 @@ public interface IBatchRecordHistoryStore { * @param records 待处理的记录列表 * @param context 任务上下文 */ - List filterProcessed(List records, IBatchChunkContext context); + Collection filterProcessed(Collection records, IBatchChunkContext context); - void saveProcessed(List filtered, Throwable exception, IBatchChunkContext context); + void saveProcessed(Collection filtered, Throwable exception, IBatchChunkContext context); } \ No newline at end of file diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordSnapshotBuilder.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordSnapshotBuilder.java index bd5306c68..f61a73c64 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordSnapshotBuilder.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchRecordSnapshotBuilder.java @@ -1,13 +1,13 @@ package io.nop.batch.core; -import java.util.List; +import java.util.Collection; public interface IBatchRecordSnapshotBuilder { interface ISnapshot { - List restore(List items, IBatchChunkContext chunkContext); + Collection restore(Collection items, IBatchChunkContext chunkContext); void onError(Throwable e); } - ISnapshot buildSnapshot(List items, IBatchChunkContext chunkContext); + ISnapshot buildSnapshot(Collection items, IBatchChunkContext chunkContext); } diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchTaskContext.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchTaskContext.java index 04e02395a..3e66b9b59 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchTaskContext.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/IBatchTaskContext.java @@ -13,7 +13,7 @@ import io.nop.core.context.IServiceContext; import io.nop.core.utils.IVarSet; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -84,9 +84,9 @@ default void setPersistVar(String name, Object value) { /** * 本次任务处理所涉及到的数据分区 */ - IntRangeBean getPartition(); + IntRangeBean getPartitionRange(); - void setPartition(IntRangeBean partition); + void setPartitionRange(IntRangeBean partitionRange); boolean isRecoverMode(); @@ -145,7 +145,7 @@ default void setPersistVar(String name, Object value) { void onChunkEnd(BiConsumer action); - void onChunkTryBegin(BiConsumer, IBatchChunkContext> action); + void onChunkTryBegin(BiConsumer, IBatchChunkContext> action); void onChunkTryEnd(BiConsumer action); @@ -153,7 +153,7 @@ default void setPersistVar(String name, Object value) { void onLoadEnd(BiConsumer action); - void onConsumeBegin(BiConsumer, IBatchChunkContext> action); + void onConsumeBegin(BiConsumer, IBatchChunkContext> action); void onConsumeEnd(BiConsumer action); @@ -170,11 +170,11 @@ default void setPersistVar(String name, Object value) { void fireChunkEnd(IBatchChunkContext chunkContext, Throwable err); - void fireChunkTryBegin(List items, IBatchChunkContext chunkContext); + void fireChunkTryBegin(Collection items, IBatchChunkContext chunkContext); void fireChunkTryEnd(IBatchChunkContext chunkContext, Throwable err); - void fireConsumeBegin(List items, IBatchChunkContext chunkContext); + void fireConsumeBegin(Collection items, IBatchChunkContext chunkContext); void fireConsumeEnd(IBatchChunkContext chunkContext, Throwable err); } \ No newline at end of file diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BatchProcessorConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BatchProcessorConsumer.java index a32b21e7a..e0fe218cc 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BatchProcessorConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BatchProcessorConsumer.java @@ -7,6 +7,9 @@ */ package io.nop.batch.core.consumer; +import io.nop.api.core.exceptions.NopException; +import io.nop.api.core.exceptions.NopTimeoutException; +import io.nop.api.core.util.ICancellable; import io.nop.batch.core.IBatchChunkContext; import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; import io.nop.batch.core.IBatchProcessorProvider.IBatchProcessor; @@ -14,8 +17,12 @@ import io.nop.batch.core.exceptions.BatchCancelException; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static io.nop.batch.core.BatchConfigs.CFG_BATCH_ASYNC_PROCESS_TIMEOUT; import static io.nop.batch.core.BatchErrors.ERR_BATCH_CANCEL_PROCESS; /** @@ -27,55 +34,94 @@ public class BatchProcessorConsumer implements IBatchConsumer { private final IBatchProcessor processor; private final IBatchConsumer consumer; + private final boolean async; + private final long asyncProcessTimeout; public BatchProcessorConsumer(IBatchProcessor processor, - IBatchConsumer consumer) { + IBatchConsumer consumer, boolean async, long asyncProcessTimeout) { this.processor = processor; this.consumer = consumer; + this.async = async; + this.asyncProcessTimeout = asyncProcessTimeout; + } + + public BatchProcessorConsumer(IBatchProcessor processor, IBatchConsumer consumer) { + this(processor, consumer, false, 0L); } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext batchChunkCtx) { - IBatchTaskMetrics metrics = context.getTaskContext().getMetrics(); + IBatchTaskMetrics metrics = batchChunkCtx.getTaskContext().getMetrics(); + + if (async) { + batchChunkCtx.initChunkLatch(new CountDownLatch(items.size())); + } // 假定为同步处理模型。这里缓存所有输出数据,至于当整个列表中的元素都被成功消费以后,才会处理输出数据 - List collector = new ArrayList<>(); + Collection outputs = newOutputs(); for (S item : items) { Object meter = metrics == null ? null : metrics.beginProcess(); boolean success = false; try { - context.incProcessCount(); - context.getTaskContext().incProcessItemCount(1); - processor.process(item, collector::add, context); + batchChunkCtx.incProcessCount(); + batchChunkCtx.getTaskContext().incProcessItemCount(1); + // processor内部可能异步执行。如果是异步执行,执行完毕后需要调用batchChunkCtx.countDown() + processor.process(item, outputs::add, batchChunkCtx); success = true; } finally { if (metrics != null) metrics.endProcess(meter, success); } - if (context.getTaskContext().isCancelled()) + if (batchChunkCtx.getTaskContext().isCancelled()) throw new BatchCancelException(ERR_BATCH_CANCEL_PROCESS); - if (context.isCancelled()) + if (batchChunkCtx.isCancelled()) throw new BatchCancelException(ERR_BATCH_CANCEL_PROCESS); } - consumeResult(collector, context); + if (async) { + try { + long timeout = asyncProcessTimeout; + if (timeout <= 0) + timeout = CFG_BATCH_ASYNC_PROCESS_TIMEOUT.get().toMillis(); + + if (!batchChunkCtx.getChunkLatch().await(timeout, TimeUnit.MILLISECONDS)) { + batchChunkCtx.cancel(ICancellable.CANCEL_REASON_TIMEOUT); + throw new NopTimeoutException(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw NopException.adapt(e); + } + } + + // 即使是空集合也执行一次consumer,有可能会触发orm flush + consumeResult(outputs, batchChunkCtx); + } + + protected Collection newOutputs() { + return async ? new ConcurrentLinkedQueue<>() : new ArrayList<>(); } - void consumeResult(List collector, IBatchChunkContext context) { + void consumeResult(Collection outputs, IBatchChunkContext context) { IBatchTaskMetrics metrics = context.getTaskContext().getMetrics(); - Object meter = metrics == null ? null : metrics.beginConsume(collector.size()); + Object meter = metrics == null ? null : metrics.beginConsume(outputs.size()); boolean success = false; try { - consumer.consume(collector, context); + context.getTaskContext().fireConsumeBegin(outputs, context); + consumer.consume(outputs, context); + context.getTaskContext().fireConsumeEnd(context, null); success = true; + } catch (Exception e) { + context.getTaskContext().fireConsumeEnd(context, e); + throw NopException.adapt(e); } finally { if (metrics != null) { - metrics.endConsume(meter, collector.size(), success); + metrics.endConsume(meter, outputs.size(), success); } } } diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BlockingSinkBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BlockingSinkBatchConsumer.java index 754841abb..da729c8a8 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BlockingSinkBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/BlockingSinkBatchConsumer.java @@ -14,7 +14,7 @@ import io.nop.batch.core.IBatchTaskContext; import io.nop.commons.concurrent.IBlockingSink; -import java.util.List; +import java.util.Collection; public class BlockingSinkBatchConsumer implements IBatchConsumer, IBatchConsumerProvider { private IBlockingSink sink; @@ -33,7 +33,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { try { sink.sendMulti(items); } catch (InterruptedException e) { diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/EmptyBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/EmptyBatchConsumer.java index 8265262be..15746d92a 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/EmptyBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/EmptyBatchConsumer.java @@ -13,7 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import java.util.Collection; public class EmptyBatchConsumer implements IBatchConsumerProvider.IBatchConsumer, IBatchConsumerProvider { static final Logger LOG = LoggerFactory.getLogger(EmptyBatchConsumer.class); @@ -30,7 +30,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { LOG.debug("batch.consumer.ignore:items={}", items); } } \ No newline at end of file diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/FilteredBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/FilteredBatchConsumer.java index 2e2f537bf..fe2040e18 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/FilteredBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/FilteredBatchConsumer.java @@ -5,6 +5,7 @@ import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; import io.nop.batch.core.IBatchRecordFilter; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -19,7 +20,7 @@ public FilteredBatchConsumer(IBatchRecordFilter filter, } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { List filtered = items.stream().filter(item -> filter.accept(item, context)) .collect(Collectors.toList()); if (!filtered.isEmpty()) { diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/InvokerBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/InvokerBatchConsumer.java index 78fb470b2..84f9f88ea 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/InvokerBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/InvokerBatchConsumer.java @@ -11,7 +11,7 @@ import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; import io.nop.commons.functional.IFunctionInvoker; -import java.util.List; +import java.util.Collection; public class InvokerBatchConsumer implements IBatchConsumer { private final IFunctionInvoker invoker; @@ -23,7 +23,7 @@ public InvokerBatchConsumer(IFunctionInvoker invoker, IBatchConsumer consumer } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { invoker.invoke(c -> { consumer.consume(items, context); return null; diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/MultiBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/MultiBatchConsumer.java index dc7e1ca8e..0c15e867b 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/MultiBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/MultiBatchConsumer.java @@ -10,6 +10,7 @@ import io.nop.batch.core.IBatchChunkContext; import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; +import java.util.Collection; import java.util.List; public final class MultiBatchConsumer implements IBatchConsumer { @@ -36,7 +37,7 @@ public IBatchConsumer first() { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { for (IBatchConsumer consumer : list) { consumer.consume(items, context); } diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RateLimitConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RateLimitConsumer.java index 4d44198f1..b31c03bd3 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RateLimitConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RateLimitConsumer.java @@ -11,7 +11,7 @@ import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; import io.nop.commons.concurrent.ratelimit.IRateLimiter; -import java.util.List; +import java.util.Collection; public class RateLimitConsumer implements IBatchConsumer { static final long RATE_LIMIT_TIMEOUT = 1000 * 60 * 20L; // 20分钟 @@ -25,7 +25,7 @@ public RateLimitConsumer(IBatchConsumer consumer, IRateLimiter rateLimiter) { } @Override - public void consume(List items, IBatchChunkContext chunkContext) { + public void consume(Collection items, IBatchChunkContext chunkContext) { rateLimiter.tryAcquire(chunkContext.getChunkItems().size(), RATE_LIMIT_TIMEOUT); consumer.consume(items, chunkContext); } diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java index 14c7fcd77..022905875 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java @@ -20,6 +20,7 @@ import io.nop.core.resource.record.IResourceRecordOutputProvider; import io.nop.dataset.record.IRecordOutput; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -139,7 +140,7 @@ ConsumerState newConsumerState(IBatchTaskContext context) { return state; } - void consume(List items, ConsumerState state) { + void consume(Collection items, ConsumerState state) { if (items.isEmpty()) return; diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RetryBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RetryBatchConsumer.java index abf4a1075..5b768f676 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RetryBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/RetryBatchConsumer.java @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -46,7 +47,7 @@ public RetryBatchConsumer(IBatchConsumer consumer, IRetryPolicy items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { IBatchRecordSnapshotBuilder.ISnapshot snapshot = snapshotBuilder == null ? null : snapshotBuilder.buildSnapshot(items, context); try { @@ -78,7 +79,7 @@ public void consume(List items, IBatchChunkContext context) { } } - void consumeSingle(List items, IBatchChunkContext context) { + void consumeSingle(Collection items, IBatchChunkContext context) { context.setSingleMode(true); for (R item : items) { @@ -88,7 +89,7 @@ void consumeSingle(List items, IBatchChunkContext context) { } } - void retryConsume(Throwable exception, List items, IBatchRecordSnapshotBuilder.ISnapshot snapshot, + void retryConsume(Throwable exception, Collection items, IBatchRecordSnapshotBuilder.ISnapshot snapshot, IBatchChunkContext context) { int retryCount = 0; Throwable fatalError = null; @@ -106,7 +107,7 @@ void retryConsume(Throwable exception, List items, IBatchRecordSnapshotBuilde } try { - List restoredItems = restoreItems(snapshot, items, context); + Collection restoredItems = restoreItems(snapshot, items, context); RetryOnceResult result = retryConsumeOnce(retryCount, restoredItems, context); if (result == null) { @@ -140,7 +141,7 @@ void retryConsume(Throwable exception, List items, IBatchRecordSnapshotBuilde throw NopException.adapt(fatalError); } - List restoreItems(IBatchRecordSnapshotBuilder.ISnapshot snapshot, List items, IBatchChunkContext chunkContext) { + Collection restoreItems(IBatchRecordSnapshotBuilder.ISnapshot snapshot, Collection items, IBatchChunkContext chunkContext) { if (snapshot == null) return items; return snapshot.restore(items, chunkContext); @@ -153,7 +154,7 @@ class RetryOnceResult { List retryItems; } - RetryOnceResult retryConsumeOnce(int retryCount, List items, IBatchChunkContext context) { + RetryOnceResult retryConsumeOnce(int retryCount, Collection items, IBatchChunkContext context) { IBatchTaskMetrics metrics = context.getTaskContext().getMetrics(); if (metrics != null) metrics.retry(items.size()); @@ -180,7 +181,7 @@ RetryOnceResult retryConsumeOnce(int retryCount, List items, IBatchChunkConte } } - RetryOnceResult retryConsumeOneByOne(int retryCount, List items, IBatchChunkContext context) { + RetryOnceResult retryConsumeOneByOne(int retryCount, Collection items, IBatchChunkContext context) { context.setSingleMode(true); List retryItems = new ArrayList<>(); diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SkipBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SkipBatchConsumer.java index ac56f21ee..c9c0f2bd5 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SkipBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SkipBatchConsumer.java @@ -16,7 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import java.util.Collection; /** * 消费失败之后允许忽略skipCount条记录。 @@ -33,7 +33,7 @@ public SkipBatchConsumer(IBatchConsumer consumer, BatchSkipPolicy skipPolicy) } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { IBatchTaskMetrics metrics = context.getTaskContext().getMetrics(); try { consumer.consume(items, context); diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SplitBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SplitBatchConsumer.java index e2be461bc..74f241dd2 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SplitBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/SplitBatchConsumer.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -48,7 +49,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { return (items, ctx) -> consume(items, ctx, activeConsumers); } - void consume(List items, IBatchChunkContext context, Map> activeConsumers) { + void consume(Collection items, IBatchChunkContext context, Map> activeConsumers) { MultiMapCollector collector = new MultiMapCollector<>(); splitter.splitMulti(items, collector, context); Map> map = collector.getResultMap(); diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/WithHistoryBatchConsumer.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/WithHistoryBatchConsumer.java index cbea591e8..1a659325d 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/WithHistoryBatchConsumer.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/WithHistoryBatchConsumer.java @@ -11,9 +11,9 @@ import io.nop.batch.core.IBatchChunkContext; import io.nop.batch.core.IBatchConsumerProvider.IBatchConsumer; import io.nop.batch.core.IBatchRecordHistoryStore; -import io.nop.commons.util.CollectionHelper; import java.util.ArrayList; +import java.util.Collection; import java.util.List; public class WithHistoryBatchConsumer implements IBatchConsumer { @@ -30,8 +30,8 @@ public WithHistoryBatchConsumer(IBatchRecordHistoryStore historyStore, } @Override - public void consume(List items, IBatchChunkContext context) { - List filtered = historyStore.filterProcessed(items, context); + public void consume(Collection items, IBatchChunkContext context) { + Collection filtered = historyStore.filterProcessed(items, context); if (!filtered.isEmpty()) { if (filtered.size() != items.size()) { if (historyConsumer != null) { diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchChunkContextImpl.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchChunkContextImpl.java index e6ac26770..0e5434c96 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchChunkContextImpl.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchChunkContextImpl.java @@ -16,6 +16,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; public class BatchChunkContextImpl extends ExecutionContextImpl implements IBatchChunkContext { private final IBatchTaskContext context; @@ -28,6 +29,8 @@ public class BatchChunkContextImpl extends ExecutionContextImpl implements IBatc private int concurrency; private int processCount; + private CountDownLatch latch; + public BatchChunkContextImpl(IBatchTaskContext context) { super(context.getEvalScope()); this.context = context; @@ -126,4 +129,18 @@ public boolean isSingleMode() { public void setSingleMode(boolean singleMode) { this.singleMode = singleMode; } + + @Override + public void initChunkLatch(CountDownLatch latch) { + this.latch = latch; + } + + public CountDownLatch getChunkLatch() { + return latch; + } + + @Override + public void countDown() { + latch.countDown(); + } } \ No newline at end of file diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskExecution.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTask.java similarity index 94% rename from nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskExecution.java rename to nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTask.java index 8b415269c..4abb0d230 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskExecution.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTask.java @@ -32,8 +32,8 @@ import static io.nop.batch.core.BatchErrors.ERR_BATCH_CANCEL_PROCESS; -public class BatchTaskExecution implements IBatchTask { - static final Logger LOG = LoggerFactory.getLogger(BatchTaskExecution.class); +public class BatchTask implements IBatchTask { + static final Logger LOG = LoggerFactory.getLogger(BatchTask.class); private final String taskName; private final long taskVersion; @@ -45,11 +45,11 @@ public class BatchTaskExecution implements IBatchTask { private final List> initializers; private final int concurrency; - public BatchTaskExecution(String taskName, long taskVersion, - IEvalFunction taskKeyExpr, Executor executor, int concurrency, - List> initializers, - IBatchLoaderProvider loaderProvider, - IBatchChunkProcessorProvider chunkProcessorProvider, IBatchStateStore stateStore) { + public BatchTask(String taskName, long taskVersion, + IEvalFunction taskKeyExpr, Executor executor, int concurrency, + List> initializers, + IBatchLoaderProvider loaderProvider, + IBatchChunkProcessorProvider chunkProcessorProvider, IBatchStateStore stateStore) { this.taskName = taskName; this.taskVersion = taskVersion; this.taskKeyExpr = taskKeyExpr; diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskContextImpl.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskContextImpl.java index a852952b2..59bda984d 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskContextImpl.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/impl/BatchTaskContextImpl.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -43,7 +44,7 @@ public class BatchTaskContextImpl extends ExecutionContextImpl implements IBatch private String taskKey; private Map params; private IVarSet persistVars = new MapVarSet(); - private IntRangeBean partition; + private IntRangeBean partitionRange; private boolean recoverMode; private IBatchTaskMetrics metrics; private Boolean allowStartIfComplete; @@ -65,13 +66,13 @@ public class BatchTaskContextImpl extends ExecutionContextImpl implements IBatch private final List> onChunkBegins = new CopyOnWriteArrayList<>(); private final List> onBeforeChunkEnds = new CopyOnWriteArrayList<>(); private final List> onChunkEnds = new CopyOnWriteArrayList<>(); - private final List, IBatchChunkContext>> onChunkTryBegin = new CopyOnWriteArrayList<>(); + private final List, IBatchChunkContext>> onChunkTryBegin = new CopyOnWriteArrayList<>(); private final List> onChunkTryEnd = new CopyOnWriteArrayList<>(); private final List> onLoadBegins = new CopyOnWriteArrayList<>(); private final List> onLoadEnds = new CopyOnWriteArrayList<>(); - private final List, IBatchChunkContext>> onConsumeBegin = new CopyOnWriteArrayList<>(); + private final List, IBatchChunkContext>> onConsumeBegin = new CopyOnWriteArrayList<>(); private final List> onConsumeEnd = new CopyOnWriteArrayList<>(); public BatchTaskContextImpl(IServiceContext svcCtx, IEvalScope scope) { @@ -183,13 +184,13 @@ public void setPersistVars(IVarSet vars) { } @Override - public IntRangeBean getPartition() { - return partition; + public IntRangeBean getPartitionRange() { + return partitionRange; } @Override - public void setPartition(IntRangeBean partition) { - this.partition = partition; + public void setPartitionRange(IntRangeBean partitionRange) { + this.partitionRange = partitionRange; } @Override @@ -352,7 +353,7 @@ public void onChunkEnd(BiConsumer action) { } @Override - public void onChunkTryBegin(BiConsumer, IBatchChunkContext> action) { + public void onChunkTryBegin(BiConsumer, IBatchChunkContext> action) { synchronized (this) { if (isDone()) { throw new IllegalStateException("nop.err.execution-already-completed"); @@ -372,7 +373,7 @@ public void onChunkTryEnd(BiConsumer action) { } @Override - public void onConsumeBegin(BiConsumer, IBatchChunkContext> action) { + public void onConsumeBegin(BiConsumer, IBatchChunkContext> action) { synchronized (this) { if (isDone()) { throw new IllegalStateException("nop.err.execution-already-completed"); @@ -485,10 +486,10 @@ public void fireChunkEnd(IBatchChunkContext chunkContext, Throwable err) { } @Override - public void fireChunkTryBegin(List items, IBatchChunkContext chunkContext) { - List, IBatchChunkContext>> callbacks = this.onChunkTryBegin; + public void fireChunkTryBegin(Collection items, IBatchChunkContext chunkContext) { + List, IBatchChunkContext>> callbacks = this.onChunkTryBegin; if (callbacks != null) { - for (BiConsumer, IBatchChunkContext> callback : callbacks) { + for (BiConsumer, IBatchChunkContext> callback : callbacks) { callback.accept(items, chunkContext); } } @@ -506,10 +507,10 @@ public void fireChunkTryEnd(IBatchChunkContext chunkContext, Throwable err) { @Override - public void fireConsumeBegin(List items, IBatchChunkContext chunkContext) { - List, IBatchChunkContext>> callbacks = this.onConsumeBegin; + public void fireConsumeBegin(Collection items, IBatchChunkContext chunkContext) { + List, IBatchChunkContext>> callbacks = this.onConsumeBegin; if (callbacks != null) { - for (BiConsumer, IBatchChunkContext> callback : callbacks) { + for (BiConsumer, IBatchChunkContext> callback : callbacks) { callback.accept(items, chunkContext); } } diff --git a/nop-batch/nop-batch-core/src/test/java/io/nop/batch/core/TestBatchTask.java b/nop-batch/nop-batch-core/src/test/java/io/nop/batch/core/TestBatchTask.java index ffd0fcc57..146b423e6 100644 --- a/nop-batch/nop-batch-core/src/test/java/io/nop/batch/core/TestBatchTask.java +++ b/nop-batch/nop-batch-core/src/test/java/io/nop/batch/core/TestBatchTask.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -110,7 +111,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { count.addAndGet(items.size()); this.items.addAll(items); } diff --git a/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/manager/ModelBasedBatchTaskBuilderFactory.java b/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/manager/ModelBasedBatchTaskBuilderFactory.java index 4dc738799..b81f54240 100644 --- a/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/manager/ModelBasedBatchTaskBuilderFactory.java +++ b/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/manager/ModelBasedBatchTaskBuilderFactory.java @@ -3,6 +3,7 @@ import io.nop.api.core.convert.ConvertHelper; import io.nop.api.core.exceptions.NopException; import io.nop.api.core.ioc.IBeanProvider; +import io.nop.batch.core.BatchConstants; import io.nop.batch.core.BatchDispatchConfig; import io.nop.batch.core.BatchTaskBuilder; import io.nop.batch.core.IBatchAggregator; @@ -12,6 +13,7 @@ import io.nop.batch.core.IBatchMetaProvider; import io.nop.batch.core.IBatchProcessorProvider; import io.nop.batch.core.IBatchRecordFilter; +import io.nop.batch.core.IBatchRecordSnapshotBuilder; import io.nop.batch.core.IBatchStateStore; import io.nop.batch.core.IBatchTaskBuilder; import io.nop.batch.core.consumer.EmptyBatchConsumer; @@ -31,6 +33,7 @@ import io.nop.batch.dsl.model.BatchTaggerModel; import io.nop.batch.dsl.model.BatchTaskModel; import io.nop.batch.gen.loader.BatchGenLoaderProvider; +import io.nop.batch.orm.support.OrmBatchRecordSnapshotBuilder; import io.nop.commons.collections.OrderByComparator; import io.nop.commons.concurrent.executor.GlobalExecutors; import io.nop.commons.util.CollectionHelper; @@ -103,6 +106,13 @@ public IBatchTaskBuilder newTaskBuilder(IBeanProvider beanContainer) { builder.startLimit(batchTaskModel.getStartLimit()); builder.useBatchRequestGenerator(batchTaskModel.isUseBatchRequestGenerator()); + if (batchTaskModel.getAsyncProcessor() != null) + builder.asyncProcessor(batchTaskModel.getAsyncProcessor()); + + if (batchTaskModel.getAsyncProcessTimeout() != null) { + builder.asyncProcessTimeout(batchTaskModel.getAsyncProcessTimeout().toMillis()); + } + builder.batchSize(batchTaskModel.getBatchSize()); if (batchTaskModel.getJitterRatio() != null) builder.jitterRatio(batchTaskModel.getJitterRatio()); @@ -181,6 +191,10 @@ private void buildTask(BatchTaskBuilder builder, IBeanProvider b builder.dispatchConfig(buildDispatchConfig(batchTaskModel.getLoader().getDispatcher(), beanContainer)); } + if (batchTaskModel.getSnapshotBuilder() != null) { + builder.batchRecordSnapshotBuilder(getSnapshotBuilder(batchTaskModel.getSnapshotBuilder(), beanContainer)); + } + if (batchTaskModel.getProcessors() != null) { List> list = new ArrayList<>(batchTaskModel.getProcessors().size()); @@ -234,6 +248,12 @@ private void buildTask(BatchTaskBuilder builder, IBeanProvider b } } + private IBatchRecordSnapshotBuilder getSnapshotBuilder(String beanName, IBeanProvider beanContainer) { + if (beanName.equals(BatchConstants.SNAPSHOT_BUILDER_ORM_ENTITY)) + return new OrmBatchRecordSnapshotBuilder(ormTemplate, false); + return (IBatchRecordSnapshotBuilder) beanContainer.getBean(beanName); + } + private BatchDispatchConfig buildDispatchConfig(BatchLoaderDispatcherModel dispatcherModel, IBeanProvider beanProvider) { Executor executor = getExecutor(dispatcherModel.getExecutor(), beanProvider); int fetchThreadCount = dispatcherModel.getFetchThreadCount(); diff --git a/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/model/_gen/_BatchTaskModel.java b/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/model/_gen/_BatchTaskModel.java index 1eb2c01ae..3175d51d9 100644 --- a/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/model/_gen/_BatchTaskModel.java +++ b/nop-batch/nop-batch-dsl/src/main/java/io/nop/batch/dsl/model/_gen/_BatchTaskModel.java @@ -23,6 +23,20 @@ public abstract class _BatchTaskModel extends io.nop.core.resource.component.Abs */ private java.lang.Boolean _allowStartIfComplete ; + /** + * + * xml name: asyncProcessTimeout + * + */ + private java.time.Duration _asyncProcessTimeout ; + + /** + * + * xml name: asyncProcessor + * + */ + private java.lang.Boolean _asyncProcessor ; + /** * * xml name: batchSize @@ -136,6 +150,13 @@ public abstract class _BatchTaskModel extends io.nop.core.resource.component.Abs */ private io.nop.batch.dsl.model.BatchSkipPolicyModel _skipPolicy ; + /** + * + * xml name: snapshotBuilder + * + */ + private java.lang.String _snapshotBuilder ; + /** * * xml name: startLimit @@ -204,6 +225,44 @@ public void setAllowStartIfComplete(java.lang.Boolean value){ } + /** + * + * xml name: asyncProcessTimeout + * + */ + + public java.time.Duration getAsyncProcessTimeout(){ + return _asyncProcessTimeout; + } + + + public void setAsyncProcessTimeout(java.time.Duration value){ + checkAllowChange(); + + this._asyncProcessTimeout = value; + + } + + + /** + * + * xml name: asyncProcessor + * + */ + + public java.lang.Boolean getAsyncProcessor(){ + return _asyncProcessor; + } + + + public void setAsyncProcessor(java.lang.Boolean value){ + checkAllowChange(); + + this._asyncProcessor = value; + + } + + /** * * xml name: batchSize @@ -587,6 +646,25 @@ public void setSkipPolicy(io.nop.batch.dsl.model.BatchSkipPolicyModel value){ } + /** + * + * xml name: snapshotBuilder + * + */ + + public java.lang.String getSnapshotBuilder(){ + return _snapshotBuilder; + } + + + public void setSnapshotBuilder(java.lang.String value){ + checkAllowChange(); + + this._snapshotBuilder = value; + + } + + /** * * xml name: startLimit @@ -752,6 +830,8 @@ protected void outputJson(IJsonHandler out){ super.outputJson(out); out.putNotNull("allowStartIfComplete",this.getAllowStartIfComplete()); + out.putNotNull("asyncProcessTimeout",this.getAsyncProcessTimeout()); + out.putNotNull("asyncProcessor",this.getAsyncProcessor()); out.putNotNull("batchSize",this.getBatchSize()); out.putNotNull("concurrency",this.getConcurrency()); out.putNotNull("consumers",this.getConsumers()); @@ -768,6 +848,7 @@ protected void outputJson(IJsonHandler out){ out.putNotNull("singleMode",this.getSingleMode()); out.putNotNull("singleSession",this.getSingleSession()); out.putNotNull("skipPolicy",this.getSkipPolicy()); + out.putNotNull("snapshotBuilder",this.getSnapshotBuilder()); out.putNotNull("startLimit",this.getStartLimit()); out.putNotNull("tagger",this.getTagger()); out.putNotNull("taskKeyExpr",this.getTaskKeyExpr()); @@ -787,6 +868,8 @@ protected void copyTo(BatchTaskModel instance){ super.copyTo(instance); instance.setAllowStartIfComplete(this.getAllowStartIfComplete()); + instance.setAsyncProcessTimeout(this.getAsyncProcessTimeout()); + instance.setAsyncProcessor(this.getAsyncProcessor()); instance.setBatchSize(this.getBatchSize()); instance.setConcurrency(this.getConcurrency()); instance.setConsumers(this.getConsumers()); @@ -803,6 +886,7 @@ protected void copyTo(BatchTaskModel instance){ instance.setSingleMode(this.getSingleMode()); instance.setSingleSession(this.getSingleSession()); instance.setSkipPolicy(this.getSkipPolicy()); + instance.setSnapshotBuilder(this.getSnapshotBuilder()); instance.setStartLimit(this.getStartLimit()); instance.setTagger(this.getTagger()); instance.setTaskKeyExpr(this.getTaskKeyExpr()); diff --git a/nop-batch/nop-batch-dsl/src/main/resources/_vfs/nop/batch/xlib/batch.xlib b/nop-batch/nop-batch-dsl/src/main/resources/_vfs/nop/batch/xlib/batch.xlib index 74b6dd561..7d2b060b1 100644 --- a/nop-batch/nop-batch-dsl/src/main/resources/_vfs/nop/batch/xlib/batch.xlib +++ b/nop-batch/nop-batch-dsl/src/main/resources/_vfs/nop/batch/xlib/batch.xlib @@ -4,10 +4,16 @@ xmlns:x="/nop/schema/xdsl.xdef" xmlns:c="c"> + + + @@ -35,6 +41,8 @@ const taskStepRt = get('taskStepRt'); batchTaskContext.setFlowId(taskStepRt?.taskInstanceId); batchTaskContext.setFlowStepId(taskStepRt?.stepInstanceId); + batchTaskContext.setPartitionRange(partitionRange); + batchTaskContext.setParams(params); return batchTaskBuilder.buildTask(batchTaskContext).executeAsync(batchTaskContext); diff --git a/nop-batch/nop-batch-dsl/src/test/java/io/nop/batch/dsl/TestBatchTaskDsl.java b/nop-batch/nop-batch-dsl/src/test/java/io/nop/batch/dsl/TestBatchTaskDsl.java index d0bf90f20..6832fc5f2 100644 --- a/nop-batch/nop-batch-dsl/src/test/java/io/nop/batch/dsl/TestBatchTaskDsl.java +++ b/nop-batch/nop-batch-dsl/src/test/java/io/nop/batch/dsl/TestBatchTaskDsl.java @@ -39,7 +39,7 @@ public void testBatchTask() { makeInputFile(bizDate); taskRt.setInput("bizDate", bizDate); - Map outputs = task.execute(taskRt).get(); + Map outputs = task.execute(taskRt).syncGetOutputs(); System.out.println(outputs); } diff --git a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcInsertBatchConsumer.java b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcInsertBatchConsumer.java index 4c73abd82..0a3cb253c 100644 --- a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcInsertBatchConsumer.java +++ b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcInsertBatchConsumer.java @@ -18,7 +18,7 @@ import io.nop.dao.jdbc.JdbcBatcher; import io.nop.dataset.binder.IDataParameterBinder; -import java.util.List; +import java.util.Collection; import java.util.Map; public class JdbcInsertBatchConsumer implements IBatchConsumerProvider, IBatchConsumer { @@ -42,7 +42,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { SQL sql = SQL.begin().name("batch-insert").insertInto(tableName).end(); jdbcTemplate.runWithConnection(sql, conn -> { diff --git a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcKeyDuplicateFilter.java b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcKeyDuplicateFilter.java index 263c6931b..dd51bc505 100644 --- a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcKeyDuplicateFilter.java +++ b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcKeyDuplicateFilter.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -40,7 +41,7 @@ public JdbcKeyDuplicateFilter(IJdbcTemplate jdbcTemplate, String tableName, Map< } @Override - public List filterProcessed(List records, IBatchChunkContext context) { + public Collection filterProcessed(Collection records, IBatchChunkContext context) { if (keyBinders.size() == 1) { String keyCol = CollectionHelper.first(keyBinders.keySet()); IDataParameterBinder binder = keyBinders.get(keyCol); @@ -74,7 +75,7 @@ public List filterProcessed(List records, IBatchChunkContext context) { } } - private SQL buildSelectByKeySql(List records, String keyCol, IDataParameterBinder binder) { + private SQL buildSelectByKeySql(Collection records, String keyCol, IDataParameterBinder binder) { SQL.SqlBuilder sb = SQL.begin(); sb.select().append(keyCol); sb.from().sql(tableName); @@ -100,7 +101,7 @@ private List getCompositeKey(S record) { return ret; } - private SQL buildSelectByCompositeKeySql(List records) { + private SQL buildSelectByCompositeKeySql(Collection records) { SQL.SqlBuilder sb = SQL.begin(); sb.select().fields(null, keyBinders.keySet()); sb.from().sql(tableName); @@ -124,7 +125,7 @@ private void appendIdCond(SQL.SqlBuilder sb, S record) { } @Override - public void saveProcessed(List filtered, Throwable exception, IBatchChunkContext context) { + public void saveProcessed(Collection filtered, Throwable exception, IBatchChunkContext context) { } } diff --git a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcUpdateBatchConsumer.java b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcUpdateBatchConsumer.java index 233b478dd..d94cddc0f 100644 --- a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcUpdateBatchConsumer.java +++ b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/consumer/JdbcUpdateBatchConsumer.java @@ -19,7 +19,6 @@ import io.nop.dataset.binder.IDataParameterBinder; import java.util.Collection; -import java.util.List; import java.util.Map; public class JdbcUpdateBatchConsumer implements IBatchConsumerProvider, IBatchConsumer { @@ -46,7 +45,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { SQL sql = SQL.begin().name("batch-update").insertInto(tableName).end(); jdbcTemplate.runWithConnection(sql, conn -> { diff --git a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/loader/JdbcBatchLoaderProvider.java b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/loader/JdbcBatchLoaderProvider.java index a98ffe1ca..9a308c0d8 100644 --- a/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/loader/JdbcBatchLoaderProvider.java +++ b/nop-batch/nop-batch-jdbc/src/main/java/io/nop/batch/jdbc/loader/JdbcBatchLoaderProvider.java @@ -192,7 +192,7 @@ LoaderState newState(IBatchTaskContext context) { if (query == null) query = new QueryBean(); - IntRangeBean range = context.getPartition(); + IntRangeBean range = context.getPartitionRange(); // 自动增加分区条件 if (partitionIndexField != null && range != null) { diff --git a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmBatchConsumer.java b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmBatchConsumer.java index 0225a74bc..eba739e9c 100644 --- a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmBatchConsumer.java +++ b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmBatchConsumer.java @@ -56,7 +56,7 @@ private List getColumns(IOrmEntityDao dao, Collection f } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { Map keyMap = new HashMap<>(items.size()); for (R item : items) { keyMap.put(getKey(item), item); diff --git a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmInsertBatchConsumer.java b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmInsertBatchConsumer.java index 49ab0f123..bc0840349 100644 --- a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmInsertBatchConsumer.java +++ b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/consumer/OrmInsertBatchConsumer.java @@ -6,7 +6,7 @@ import io.nop.orm.IOrmEntity; import io.nop.orm.IOrmTemplate; -import java.util.List; +import java.util.Collection; public class OrmInsertBatchConsumer implements IBatchConsumerProvider.IBatchConsumer, IBatchConsumerProvider { private final IOrmTemplate ormTemplate; @@ -21,7 +21,7 @@ public IBatchConsumer setup(IBatchTaskContext context) { } @Override - public void consume(List items, IBatchChunkContext context) { + public void consume(Collection items, IBatchChunkContext context) { ormTemplate.batchSaveOrUpdate(items); } } \ No newline at end of file diff --git a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/loader/OrmQueryBatchLoaderProvider.java b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/loader/OrmQueryBatchLoaderProvider.java index 0623f2c3a..1c76f3c3a 100644 --- a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/loader/OrmQueryBatchLoaderProvider.java +++ b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/loader/OrmQueryBatchLoaderProvider.java @@ -102,7 +102,7 @@ public IBatchLoader setup(IBatchTaskContext context) { LoaderState newLoaderState(IBatchTaskContext context) { LoaderState state = new LoaderState<>(); state.query = queryBuilder == null ? new QueryBean() : queryBuilder.buildQuery(context); - IntRangeBean range = context.getPartition(); + IntRangeBean range = context.getPartitionRange(); // 自动增加分区条件 if (partitionIndexField != null && range != null) { diff --git a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/support/OrmBatchRecordSnapshotBuilder.java b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/support/OrmBatchRecordSnapshotBuilder.java index 0de3be839..8369b023d 100644 --- a/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/support/OrmBatchRecordSnapshotBuilder.java +++ b/nop-batch/nop-batch-orm/src/main/java/io/nop/batch/orm/support/OrmBatchRecordSnapshotBuilder.java @@ -8,6 +8,7 @@ import io.nop.orm.IOrmTemplate; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -23,14 +24,14 @@ public OrmBatchRecordSnapshotBuilder(IOrmTemplate ormTemplate, boolean cascadeAt } @Override - public ISnapshot buildSnapshot(List items, IBatchChunkContext ctx) { + public ISnapshot buildSnapshot(Collection items, IBatchChunkContext ctx) { Map> map = new HashMap<>(); for (IDaoEntity item : items) { map.put(item, item.orm_initedValues()); } return new ISnapshot() { @Override - public List restore(List list, IBatchChunkContext chunkContext) { + public Collection restore(Collection list, IBatchChunkContext chunkContext) { IOrmSession session = ormTemplate.currentSession(); List ret = new ArrayList<>(list.size()); for (T item : list) { diff --git a/nop-cli/demo/_vfs/batch/batch-demo.task.xml b/nop-cli/demo/_vfs/batch/batch-demo.task.xml index cb99e523a..77d703438 100644 --- a/nop-cli/demo/_vfs/batch/batch-demo.task.xml +++ b/nop-cli/demo/_vfs/batch/batch-demo.task.xml @@ -37,6 +37,7 @@ + 0){ @@ -89,6 +90,7 @@ + return item instanceof String; diff --git a/nop-commons/src/main/java/io/nop/commons/concurrent/executor/GlobalExecutors.java b/nop-commons/src/main/java/io/nop/commons/concurrent/executor/GlobalExecutors.java index e1a028924..ad4f98573 100644 --- a/nop-commons/src/main/java/io/nop/commons/concurrent/executor/GlobalExecutors.java +++ b/nop-commons/src/main/java/io/nop/commons/concurrent/executor/GlobalExecutors.java @@ -84,6 +84,10 @@ public static IThreadPoolExecutor globalWorker() { }); } + public static IThreadPoolExecutor syncExecutor() { + return SyncThreadPoolExecutor.INSTANCE; + } + public static void register(IThreadPoolExecutor executor) { IThreadPoolExecutor old = g_executors.put(executor.getName(), executor); if (old != null) { diff --git a/nop-excel/src/main/java/io/nop/excel/imp/ImportExcelParser.java b/nop-excel/src/main/java/io/nop/excel/imp/ImportExcelParser.java index 1f1473851..3a3811b96 100644 --- a/nop-excel/src/main/java/io/nop/excel/imp/ImportExcelParser.java +++ b/nop-excel/src/main/java/io/nop/excel/imp/ImportExcelParser.java @@ -35,7 +35,7 @@ import java.util.Map; import static io.nop.commons.cache.CacheConfig.newConfig; -import static io.nop.core.CoreErrors.ARG_PROP_NAME; +import static io.nop.excel.ExcelErrors.ARG_KEY_PROP; import static io.nop.excel.ExcelErrors.ARG_NAME_PATTERN; import static io.nop.excel.ExcelErrors.ARG_SHEET_NAME; import static io.nop.excel.ExcelErrors.ERR_IMPORT_MISSING_MANDATORY_SHEET; @@ -176,7 +176,7 @@ private void parseSheets(ImportSheetModel sheetModel, List sheets, D if (list.size() != sheets.size()) throw new NopException(ERR_IMPORT_SHEET_WITH_DUPLICATE_KEY_PROP) .param(ARG_SHEET_NAME, sheetModel.getName()) - .param(ARG_PROP_NAME, sheetModel.getKeyProp()); + .param(ARG_KEY_PROP, sheetModel.getKeyProp()); if (sheetModel.isMultipleAsMap()) { Map map = new LinkedHashMap<>(); diff --git a/nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef b/nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef index 14fdc4870..f8b754d44 100644 --- a/nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef +++ b/nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef @@ -12,6 +12,7 @@ rateLimit="double" jitterRatio="double" allowStartIfComplete="boolean" startLimit="!int=0" useBatchRequestGenerator="!boolean=false" executor="bean-name" xdef:name="BatchTaskModel" xdef:bean-package="io.nop.batch.dsl.model" + asyncProcessor="boolean" asyncProcessTimeout="duration" snapshotBuilder="bean-name" x:schema="/nop/schema/xdef.xdef" xmlns:x="/nop/schema/xdsl.xdef" xdef:model-name-prop="taskName" xdef:model-version-prop="taskVersion" xmlns:xdef="/nop/schema/xdef.xdef" @@ -54,6 +55,7 @@ + @@ -83,6 +85,9 @@ + @@ -98,6 +103,7 @@ +