Skip to content

Commit

Permalink
改进batch模型,consumer增加transformer配置
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Dec 18, 2024
1 parent 50f0bda commit 0de2ef5
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 3 deletions.
54 changes: 54 additions & 0 deletions docs/dev-guide/batch/batch-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,59 @@
* allowUpdate:如果发现数据已经存在,是否允许更新数据,缺省为false。


## 参数传递

1. IBatchTaskContext和IBatchChunkContext都提供了setAttribute/getAttribute方法,可以用于在整个批处理任务环境以及单个Chunk执行环境中共享数据。

```
<processor>
<source>
const myVar = batchChunkCtx.taskContext.getAttribute('myVar’);
</source>
</processor>
```

2. 在启用BatchTask的saveState属性的情况下,可以通过`IBatchTaskContext.getPersistVar/setPersistVar`来持久化保存变量信息。


## 常见问题

### 1. 数据文件中有两列数据,属性名分别为 a, b。 现在我要将这个文件导入数据库的 T1, T2 表, 属性 a 对应 T1表的字段 c1,属性 b 对应 T2表的字段 c1。 这种场景的映射关系怎么处理?

如果使用ORM来保存,则在processor中直接调用`dao.xlib`中的`SaveEntity`标签,会自动延迟提交数据库操作,只是调用`ormSession.save`.

```xml
<processor name="saveCustomer">
<source>
<dao:SaveEntity entityName="DemoCustomer" data="${{
firstName: item.customer.firstName,
lastName: item.customer.familyName,
gender: item.customer.gender,
customerNumber: item.customerNumber,
idCard: item.customer.idCard,
partitionIndex: item.customerNumber.$shortHash()
}}" xpl:lib="/nop/orm/xlib/dao.xlib"/>
</source>
</processor>

```

另外可以processor中调用多次consume来输出多个结果。 然后在writer中配置filter,过滤接收即可。

```xml
<processor name="process">
<source>
...
consume(result1);
consume(result2);
</source>
</processor>

<consumer name="saveResult1">
<filter>
return item.name == 'result1';
</filter>

<file-writer filePath="result.csv"/>
</consumer>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.nop.batch.core.consumer;

import io.nop.batch.core.IBatchChunkContext;
import io.nop.batch.core.IBatchConsumerProvider;
import io.nop.batch.core.IBatchTaskContext;
import io.nop.core.lang.eval.IEvalFunction;

import java.util.Collection;

public class EvalBatchConsumer<R> implements IBatchConsumerProvider.IBatchConsumer<R>, IBatchConsumerProvider<R> {
private final IEvalFunction func;

public EvalBatchConsumer(IEvalFunction func) {
this.func = func;
}

@Override
public IBatchConsumer<R> setup(IBatchTaskContext context) {
return this;
}

@Override
public void consume(Collection<R> records, IBatchChunkContext context) {
func.call2(null, records, context, context.getEvalScope());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.nop.batch.core.consumer;

import io.nop.batch.core.IBatchChunkContext;
import io.nop.batch.core.IBatchConsumerProvider;
import io.nop.batch.core.IBatchTaskContext;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;

public class TransformedBatchConsumerProvider<R> implements IBatchConsumerProvider<R> {
private final IBatchConsumerProvider<R> provider;
private final BiFunction<R, IBatchChunkContext, R> transformer;

public TransformedBatchConsumerProvider(IBatchConsumerProvider<R> provider, BiFunction<R, IBatchChunkContext, R> transformer) {
this.provider = provider;
this.transformer = transformer;
}

@Override
public IBatchConsumer<R> setup(IBatchTaskContext context) {
return new TransformedBatchConsumer(provider.setup(context), transformer);
}

static class TransformedBatchConsumer<R> implements IBatchConsumer<R> {
private final IBatchConsumer<R> consumer;
private final BiFunction<R, IBatchChunkContext, R> transformer;

public TransformedBatchConsumer(IBatchConsumer<R> consumer, BiFunction<R, IBatchChunkContext, R> transformer) {
this.consumer = consumer;
this.transformer = transformer;
}

@Override
public void consume(Collection<R> items, IBatchChunkContext context) {
List<R> list = new ArrayList<>(items.size());
for (R item : items) {
R newItem = transformer.apply(item, context);
if (newItem != null)
list.add(newItem);
}
consumer.consume(list, context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.nop.batch.core.IBatchStateStore;
import io.nop.batch.core.IBatchTaskBuilder;
import io.nop.batch.core.consumer.EmptyBatchConsumer;
import io.nop.batch.core.consumer.EvalBatchConsumer;
import io.nop.batch.core.consumer.MultiBatchConsumerProvider;
import io.nop.batch.core.consumer.ResourceRecordConsumerProvider;
import io.nop.batch.core.consumer.SplitBatchConsumer;
import io.nop.batch.core.consumer.TransformedBatchConsumerProvider;
import io.nop.batch.core.filter.EvalBatchRecordFilter;
import io.nop.batch.core.loader.PostProcessBatchLoaderProvider;
import io.nop.batch.core.processor.FilterBatchProcessor;
Expand Down Expand Up @@ -515,12 +517,22 @@ private IBatchConsumerProvider<Object> getWriter0(BatchConsumerModel consumerMod
ret = newOrmWriter(consumerModel.getOrmWriter(), daoProvider, ormTemplate);
} else if (consumerModel.getJdbcWriter() != null) {
ret = newJdbcWriter(consumerModel.getJdbcWriter(), jdbcTemplate);
} else if (consumerModel.getSource() != null) {
ret = newXplWriter(consumerModel.getSource());
} else {
ret = null;
}

if (ret != null && consumerModel.getTransformer() != null)
ret = new TransformedBatchConsumerProvider<>(ret,
(item, chunkCtx) -> consumerModel.getTransformer().call2(null, item, chunkCtx, chunkCtx.getEvalScope()));
return addFilterForWriter(consumerModel, ret);
}

private IBatchConsumerProvider<Object> newXplWriter(IEvalFunction source) {
return new EvalBatchConsumer<>(source);
}

private IBatchConsumerProvider<Object> addFilterForWriter(BatchConsumerModel consumerModel, IBatchConsumerProvider<Object> consumer) {
if (consumerModel.getFilter() == null)
return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ public abstract class _BatchConsumerModel extends io.nop.batch.dsl.model.BatchLi
*/
private io.nop.core.lang.eval.IEvalFunction _source ;

/**
*
* xml name: transformer
* 实际写入之前执行transformer进行结构转换。实际保存的是返回的结果对象。如果transformer的返回值是null,则忽略这个条目
*/
private io.nop.core.lang.eval.IEvalFunction _transformer ;

/**
*
* xml name: adapter
Expand Down Expand Up @@ -328,6 +335,25 @@ public void setSource(io.nop.core.lang.eval.IEvalFunction value){
}


/**
*
* xml name: transformer
* 实际写入之前执行transformer进行结构转换。实际保存的是返回的结果对象。如果transformer的返回值是null,则忽略这个条目
*/

public io.nop.core.lang.eval.IEvalFunction getTransformer(){
return _transformer;
}


public void setTransformer(io.nop.core.lang.eval.IEvalFunction value){
checkAllowChange();

this._transformer = value;

}



@Override
public void freeze(boolean cascade){
Expand Down Expand Up @@ -361,6 +387,7 @@ protected void outputJson(IJsonHandler out){
out.putNotNull("order",this.getOrder());
out.putNotNull("ormWriter",this.getOrmWriter());
out.putNotNull("source",this.getSource());
out.putNotNull("transformer",this.getTransformer());
}

public BatchConsumerModel cloneInstance(){
Expand All @@ -384,6 +411,7 @@ protected void copyTo(BatchConsumerModel instance){
instance.setOrder(this.getOrder());
instance.setOrmWriter(this.getOrmWriter());
instance.setSource(this.getSource());
instance.setTransformer(this.getTransformer());
}

protected BatchConsumerModel newInstance(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class _BatchTaskModel extends io.nop.batch.dsl.model.BatchListen
* xml name: concurrency
* 同时启动多少个线程去并行处理。设置了concurrency的情况下,还需要设置executor才会真正并行执行,否则会使用SyncExecutor在当前线程上串行执行
*/
private int _concurrency = 1;
private int _concurrency = 0;

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public void consume(Collection<R> items, IBatchChunkContext context) {
if (!allowUpdate)
return;

if (entity == item)
return;

OrmBatchHelper.assignEntity(entity, item);
dao.updateEntity(entity);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public abstract class _TaskStepModel extends io.nop.task.model.TaskExecutableMod
/**
*
* xml name: sync
*
* 步骤执行时同步调用TaskStepReturn.sync()同步等待本步骤执行完毕,然后再继续执行下一步骤。缺省情况下会使用Promise异步等待。
*/
private boolean _sync = false;

Expand Down Expand Up @@ -392,7 +392,7 @@ public void setSaveState(java.lang.Boolean value){
/**
*
* xml name: sync
*
* 步骤执行时同步调用TaskStepReturn.sync()同步等待本步骤执行完毕,然后再继续执行下一步骤。缺省情况下会使用Promise异步等待。
*/

public boolean isSync(){
Expand Down
3 changes: 3 additions & 0 deletions nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@
</fields>
</jdbc-writer>

<!-- 实际写入之前执行transformer进行结构转换。实际保存的是返回的结果对象。如果transformer的返回值是null,则忽略这个条目 -->
<transformer xdef:value="xpl-fn:(item,batchChunkCtx)=>any"/>

<source xdef:value="xpl-fn:(items,batchChunkCtx)=>void"/>

<adapter xdef:value="xpl-fn:(consumer)=>any"/>
Expand Down

0 comments on commit 0de2ef5

Please sign in to comment.