Skip to content

Commit

Permalink
batch模型支持encoding使用表达式设置
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Dec 24, 2024
1 parent 4e2134e commit de7c580
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.nop.batch.core.consumer;

import io.nop.api.core.convert.ConvertHelper;
import io.nop.api.core.exceptions.NopException;
import io.nop.batch.core.BatchConstants;
import io.nop.batch.core.IBatchAggregator;
Expand All @@ -16,13 +17,13 @@
import io.nop.batch.core.common.AbstractBatchResourceHandler;
import io.nop.batch.core.exceptions.BatchCancelException;
import io.nop.commons.util.IoHelper;
import io.nop.core.lang.eval.IEvalAction;
import io.nop.core.resource.IResource;
import io.nop.core.resource.record.IResourceRecordOutputProvider;
import io.nop.dataset.record.IRecordOutput;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.nop.batch.core.BatchErrors.ARG_RESOURCE_PATH;
Expand All @@ -36,7 +37,7 @@
public class ResourceRecordConsumerProvider<R> extends AbstractBatchResourceHandler
implements IBatchConsumerProvider<R> {
private IResourceRecordOutputProvider<R> recordIO;
private String encoding;
private IEvalAction encodingExpr;

/**
* 写入文件时如果发生异常,则抛出BatchCancelException,取消整个Batch任务处理
Expand Down Expand Up @@ -65,8 +66,8 @@ public void setRecordIO(IResourceRecordOutputProvider<R> recordIO) {
this.recordIO = recordIO;
}

public void setEncoding(String encoding) {
this.encoding = encoding;
public void setEncodingExpr(IEvalAction encodingExpr) {
this.encodingExpr = encodingExpr;
}

public void setAggregator(IBatchAggregator<R, Object, Map<String, Object>> aggregator) {
Expand Down Expand Up @@ -94,6 +95,7 @@ public IBatchConsumer<R> setup(IBatchTaskContext context) {
ConsumerState<R> newConsumerState(IBatchTaskContext context) {
ConsumerState<R> state = new ConsumerState<>();
IResource resource = getResource(context);
String encoding = this.encodingExpr == null ? null : ConvertHelper.toString(this.encodingExpr.invoke(context));
state.output = recordIO.openOutput(resource, encoding);
Map<String, Object> header = null;
if (metaProvider != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ResourceRecordLoaderProvider<S> extends AbstractBatchResourceHandle
implements IBatchLoaderProvider<S> {

private IResourceRecordInputProvider<S> recordIO;
private String encoding;
private IEvalAction encodingExpr;

// 对读取的数据进行汇总处理,例如统计读入的总行数等,最后在complete时写入到数据库中
private IBatchAggregator<S, Object, ?> aggregator;
Expand Down Expand Up @@ -85,8 +85,8 @@ static class LoaderState<S> {
IBatchTaskContext context;
}

public void setEncoding(String encoding) {
this.encoding = encoding;
public void setEncodingExpr(IEvalAction encodingExpr) {
this.encodingExpr = encodingExpr;
}

public void setAggregator(IBatchAggregator<S, Object, ?> aggregator) {
Expand Down Expand Up @@ -146,6 +146,7 @@ LoaderState<S> newLoaderState(IBatchTaskContext context) {
LoaderState<S> state = new LoaderState<>();
state.context = context;
IResource resource = getResource(context);
String encoding = this.encodingExpr == null ? null : ConvertHelper.toString(this.encodingExpr.invoke(context));
IRecordInput<S> input = recordIO.openInput(resource, encoding);

input.beforeRead(context.getAttributes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static IBatchLoaderProvider<Object> newFileReader(BatchFileReaderModel lo
if (loaderModel.getMaxCountExpr() != null)
loader.setMaxCountExpr(loaderModel.getMaxCountExpr());
loader.setPathExpr(loaderModel.getFilePath());
loader.setEncoding(loaderModel.getEncoding());
loader.setEncodingExpr(loaderModel.getEncoding());
loader.setAggregator(aggregator);
if (loaderModel.getFilter() != null) {
IBatchRecordFilter<Object, IBatchTaskContext> filter = new EvalBatchRecordFilter<>(loaderModel.getFilter());
Expand Down Expand Up @@ -92,7 +92,7 @@ public static ResourceRecordConsumerProvider<Object> newFileWriter(BatchFileWrit
IResourceLoader resourceLoader = loadResourceLoader(consumerModel.getResourceLoader(), beanContainer);

ResourceRecordConsumerProvider<Object> writer = new ResourceRecordConsumerProvider<>();
writer.setEncoding(consumerModel.getEncoding());
writer.setEncodingExpr(consumerModel.getEncoding());
writer.setPathExpr(consumerModel.getFilePath());
writer.setRecordIO(recordIO);
writer.setResourceLoader(resourceLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class _BatchFileReaderModel extends io.nop.core.resource.compone
* xml name: encoding
* 文件编码,缺省值为UTF-8
*/
private java.lang.String _encoding ;
private io.nop.core.lang.eval.IEvalAction _encoding ;

/**
*
Expand Down Expand Up @@ -125,12 +125,12 @@ public void setCsvFormat(java.lang.String value){
* 文件编码,缺省值为UTF-8
*/

public java.lang.String getEncoding(){
public io.nop.core.lang.eval.IEvalAction getEncoding(){
return _encoding;
}


public void setEncoding(java.lang.String value){
public void setEncoding(io.nop.core.lang.eval.IEvalAction value){
checkAllowChange();

this._encoding = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class _BatchFileWriterModel extends io.nop.core.resource.compone
* xml name: encoding
*
*/
private java.lang.String _encoding ;
private io.nop.core.lang.eval.IEvalAction _encoding ;

/**
*
Expand Down Expand Up @@ -104,12 +104,12 @@ public void setCsvFormat(java.lang.String value){
*
*/

public java.lang.String getEncoding(){
public io.nop.core.lang.eval.IEvalAction getEncoding(){
return _encoding;
}


public void setEncoding(java.lang.String value){
public void setEncoding(io.nop.core.lang.eval.IEvalAction value){
checkAllowChange();

this._encoding = value;
Expand Down
4 changes: 2 additions & 2 deletions nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
@maxCount 读取的最大记录数,缺省值为-1,表示不限制
@fileModelPath 文件模型路径。当没有指定resourceIO和newRecordInputProvider时,根据fileModelPath自动生成resourceIO
-->
<file-reader filePath="!t-expr" xdef:name="BatchFileReaderModel" encoding="string"
<file-reader filePath="!t-expr" xdef:name="BatchFileReaderModel" encoding="t-expr"
csvFormat="string"
resourceLoader="bean-name" resourceIO="bean-name" maxCountExpr="expr" fileModelPath="v-path">
<!-- 动态创建resourceIO -->
Expand Down Expand Up @@ -147,7 +147,7 @@
aggregator="bean-name" metaProvider="bean-name">
<filter xdef:value="xpl-fn:(item,batchChunkCtx)=>boolean"/>

<file-writer filePath="!t-expr" xdef:name="BatchFileWriterModel" encoding="string"
<file-writer filePath="!t-expr" xdef:name="BatchFileWriterModel" encoding="t-expr"
csvFormat="string"
resourceLoader="bean-name" resourceIO="bean-name" fileModelPath="v-path">
<newRecordOutputProvider xdef:value="xpl"/>
Expand Down

0 comments on commit de7c580

Please sign in to comment.