Skip to content

Commit

Permalink
改进batchloader
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Apr 19, 2024
1 parent 4b5f526 commit e4a1560
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 30 deletions.
8 changes: 4 additions & 4 deletions docs/arch/module-dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Nop平台的后端服务使用NopGraphQL引擎实现。NopGraphQL引擎没有使

![](images/rpc-modules.png)

Nop平台在GraphQL引擎的基础上建立了分布式RPC机制,参见[rpc-design.md](https://gitee.com/canonical-entropy/nop-entropy/blob/master/docs/dev-guide/microservice/rpc-design.md)
Nop平台在GraphQL引擎的基础上建立了分布式RPC机制,参见[rpc-design.md](../dev-guide/microservice/rpc-design.md)

* nop-cluster-core提供了负载均衡、服务发现、Leader选举等集群相关的支持机制

Expand Down Expand Up @@ -107,15 +107,15 @@ Nop平台整体设计采用了可分可合的灵活组织形式。初始代码

![](images/report-modules.png)

NopReport是一个采用Excel作为可视化设计器的支持中国式报表的报表引擎,使用方法参见[report.md](https://gitee.com/canonical-entropy/nop-entropy/blob/master/docs/user-guide/report.md)
NopReport是一个采用Excel作为可视化设计器的支持中国式报表的报表引擎,使用方法参见[report.md](../user-guide/report.md)

* nop-report-core实现了报表展开算法、报表表达式、报表函数等核心机制

* nop-ooxml-xlsx提供了Excel文件解析和保存功能,它的实现没有使用apache poi包或者其他的第三方软件包,而是直接使用XML解析器解析Excel原始文件,因此它的代码总量很小,可以嵌入到Android平台中使用

* nop-excel提供了ExcelWorkbook模型对象,利用workbook.xdef元模型,可以自动实现XML格式的DSL与ExcelWorkbook对象之间的双向转换。因此ExcelWorkbook作为领域模型是脱离Excel软件独立存在的。

* nop-ooxml-docx提供了可以进行可视化编辑的Word模板机制,具体使用参见[word-temlate.md](https://gitee.com/canonical-entropy/nop-entropy/blob/master/docs/dev-guide/report/word-template.md)
* nop-ooxml-docx提供了可以进行可视化编辑的Word模板机制,具体使用参见[word-temlate.md](../dev-guide/report/word-template.md)

## 七. 自动化测试框架

Expand Down Expand Up @@ -143,7 +143,7 @@ Nop平台的自动化测试框架原则上是一种通用设计,不依赖于JU

![](images/cli-modules.png)

nop-cli命令行工具聚合了一批可以独立使用的功能,具体参见[cli.md](https://gitee.com/canonical-entropy/nop-entropy/blob/master/docs/dev-guide/cli.md)
nop-cli命令行工具聚合了一批可以独立使用的功能,具体参见[cli.md](../dev-guide/cli.md)

* gen指令需要用到nop-codegen模块来执行代码生成

Expand Down
2 changes: 2 additions & 0 deletions docs/dev-guide/orm/native-db.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# MySQL
1. fetchSize设置为Integer.MIN_VALUE才是流模式,否则会将数据集中的所有数据都加载到内存中
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.nop.dataset.impl.DefaultFieldMapper;
import io.nop.dataset.record.impl.RecordInputImpls;
import io.nop.dataset.rowmapper.ColumnMapRowMapper;

import jakarta.inject.Inject;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -41,10 +41,17 @@ public class JdbcBatchLoader<T> implements IBatchLoader<T, IBatchChunkContext>,

private INamedSqlBuilder namedSqlBuilder;

private String sqlName;

@SuppressWarnings("unchecked")
private IRowMapper<T> rowMapper = (IRowMapper<T>) ColumnMapRowMapper.CASE_INSENSITIVE;

private SQL sql;

private long maxRows;

private int maxFieldsSize;

private Connection connection;

private IDialect dialect;
Expand Down Expand Up @@ -82,11 +89,29 @@ public void setSql(SQL sql) {
}

public void setSqlName(String sqlName) {
setSqlGenerator(ctx -> namedSqlBuilder.buildSql(sqlName, ctx));
this.sqlName = sqlName;
}

public long getMaxRows() {
return maxRows;
}

public void setMaxRows(long maxRows) {
this.maxRows = maxRows;
}

public void setMaxFieldsSize(int maxFieldsSize) {
this.maxFieldsSize = maxFieldsSize;
}

@Override
public void onTaskBegin(IBatchTaskContext context) {
if (sqlGenerator == null) {
if (sqlName != null)
setSqlGenerator(ctx -> namedSqlBuilder.buildSql(sqlName, ctx));
}
Guard.notNull(sqlGenerator, "sqlGenerator");

this.sql = sqlGenerator.generateSql(context);
this.dialect = DialectManager.instance().getDialectForDataSource(dataSource);
try {
Expand All @@ -100,6 +125,17 @@ public void onTaskBegin(IBatchTaskContext context) {
ps = JdbcHelper.prepareStatement(dialect, connection, sql);
JdbcHelper.setQueryTimeout(dialect, ps, sql, false);

if (maxRows > 0) {
if (maxRows < Integer.MAX_VALUE) {
ps.setMaxRows((int) maxRows);
} else {
ps.setLargeMaxRows(maxRows);
}
}

if (maxFieldsSize > 0)
ps.setMaxFieldSize(maxFieldsSize);

ResultSet rs = ps.executeQuery();
this.dataSet = new JdbcDataSet(dialect, rs);
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.nop.dao.jdbc.IJdbcTemplate;
import io.nop.dataset.IRowMapper;
import io.nop.dataset.rowmapper.ColumnMapRowMapper;

import jakarta.inject.Inject;

import java.util.List;

public class JdbcPageBatchLoader<T> implements IBatchLoader<T, IBatchChunkContext>, IBatchTaskListener {
Expand All @@ -30,6 +30,7 @@ public class JdbcPageBatchLoader<T> implements IBatchLoader<T, IBatchChunkContex

private INamedSqlBuilder namedSqlBuilder;

@SuppressWarnings("unchecked")
private IRowMapper<T> rowMapper = (IRowMapper<T>) ColumnMapRowMapper.CASE_INSENSITIVE;

private SQL sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import io.nop.dao.api.IDaoProvider;
import io.nop.dao.api.IEntityDao;
import io.nop.dao.api.IQueryBuilder;

import jakarta.inject.Inject;

import java.util.List;

public class OrmQueryBatchLoader<S extends IDaoEntity> implements IBatchLoader<S, IBatchChunkContext>, IBatchTaskListener {
Expand All @@ -29,6 +29,10 @@ public class OrmQueryBatchLoader<S extends IDaoEntity> implements IBatchLoader<S
private S lastEntity;
private QueryBean query;

private IEntityDao<S> dao;

private List<String> batchLoadProps;

@Inject
public void setDaoProvider(IDaoProvider daoProvider) {
this.daoProvider = daoProvider;
Expand All @@ -38,10 +42,19 @@ public void setQueryBuilder(IQueryBuilder queryBuilder) {
this.queryBuilder = queryBuilder;
}

public List<String> getBatchLoadProps() {
return batchLoadProps;
}

public void setBatchLoadProps(List<String> batchLoadProps) {
this.batchLoadProps = batchLoadProps;
}

@Override
public void onTaskBegin(IBatchTaskContext context) {
query = queryBuilder.buildQuery(context);
lastEntity = null;
dao = daoProvider.dao(query.getSourceName());
}

@Override
Expand All @@ -51,15 +64,17 @@ public void onTaskEnd(Throwable exception, IBatchTaskContext context) {
}

@Override
public List<S> load(int batchSize, IBatchChunkContext context) {
IEntityDao<S> dao = daoProvider.dao(query.getSourceName());
public synchronized List<S> load(int batchSize, IBatchChunkContext context) {
List<S> list = dao.findNext(lastEntity, query.getFilter(), query.getOrderBy(), batchSize);

if (list.isEmpty()) {
return list;
}

lastEntity = list.get(list.size() - 1);

if (batchLoadProps != null)
dao.batchLoadProps(list, batchLoadProps);
return list;
}
}
1 change: 1 addition & 0 deletions nop-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
<module>nop-batch-app</module>
<module>nop-batch-jdbc</module>
<module>nop-batch-meta</module>
<module>nop-batch-dsl</module>
</modules>
</project>
17 changes: 9 additions & 8 deletions nop-dao/src/main/java/io/nop/dao/jdbc/impl/JdbcHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
import io.nop.commons.text.marker.Marker;
import io.nop.commons.text.marker.Markers;
import io.nop.commons.type.StdDataType;
import io.nop.commons.type.StdSqlType;
import io.nop.commons.util.IoHelper;
import io.nop.core.lang.sql.SQL;
import io.nop.commons.type.StdSqlType;
import io.nop.core.lang.sql.TypedValueMarker;
import io.nop.dataset.binder.IDataParameterBinder;
import io.nop.dao.DaoConfigs;
import io.nop.dataset.IDataSetMeta;
import io.nop.dataset.impl.BaseDataFieldMeta;
import io.nop.dataset.impl.BaseDataSetMeta;
import io.nop.dao.dialect.IDialect;
import io.nop.dao.dialect.pagination.IPaginationHandler;
import io.nop.dao.jdbc.dataset.JdbcStatement;
import io.nop.dataset.IDataSetMeta;
import io.nop.dataset.binder.IDataParameterBinder;
import io.nop.dataset.impl.BaseDataFieldMeta;
import io.nop.dataset.impl.BaseDataSetMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,7 +72,7 @@ public static boolean getAutoCommit(Connection conn, IDialect dialect) {
}

public static void setAutoCommit(Connection conn, boolean autoCommit, IDialect dialect) {
LOG.trace("nop.jdbc.set-auto-commit:autoCommit={},conn={}",conn,autoCommit);
LOG.trace("nop.jdbc.set-auto-commit:autoCommit={},conn={}", conn, autoCommit);
try {
conn.setAutoCommit(autoCommit);
} catch (SQLException e) {
Expand Down Expand Up @@ -142,7 +142,7 @@ static BaseDataFieldMeta getColumnMeta(ResultSetMetaData metadata, int column, C
tableName = normalize(tableName, tableNameLowerCase);
StdSqlType sqlType = StdSqlType.fromJdbcType(metadata.getColumnType(column));
StdDataType dataType = sqlType == null ? StdDataType.ANY : sqlType.getStdDataType();
return new BaseDataFieldMeta(name, sourceName, tableName, dataType,false);
return new BaseDataFieldMeta(name, sourceName, tableName, dataType, false);
}

public static CallableStatement prepareCallableStatement(IDialect dialect, Connection conn, SQL sql)
Expand Down Expand Up @@ -194,7 +194,8 @@ public static PreparedStatement prepareStatement(IDialect dialect, Connection co
PreparedStatement ps = conn.prepareStatement(sql.getText());

try {
if (sql.getFetchSize() > 0)
// fetchSize 为Integer.MIN_VALUE对于MySQL驱动而言是启用stream的特殊标记
if (sql.getFetchSize() > 0 || sql.getFetchSize() == Integer.MIN_VALUE)
ps.setFetchSize(sql.getFetchSize());

setParameters(dialect, ps, sql);
Expand Down
2 changes: 2 additions & 0 deletions nop-xdefs/src/main/resources/_vfs/nop/schema/query/query.xdef
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<?xml version="1.0" encoding="UTF-8" ?>

<query x:schema="/nop/schema/xdef.xdef" xmlns:x="/nop/schema/xdsl.xdef"
xmlns:xdef="/nop/schema/xdef.xdef"
xdef:name="QueryBean" xdef:bean-package="io.nop.api.core.beans.query"
>
<fields xdef:body-type="list" xdef:key-attr="name">
<field name="!string"/>
Expand Down
40 changes: 28 additions & 12 deletions nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,66 @@
singleMode="boolean=false" singleSession="boolean"
transactionScope="enum:io.nop.batch.core.BatchTransactionScope"
rateLimit="double" jitterRatio="double"
executor="string"
executor="string" xdef:name="BatchTaskModel" xdef:bean-package="io.nop.batch.dsl.model"
x:schema="/nop/schema/xdef.xdef" xmlns:x="/nop/schema/xdsl.xdef"
xmlns:xdef="/nop/schema/xdef.xdef"
>
<retryPolicy maxRetryCount="int" retryDelay="int" maxRetryDelay="int"
<retryPolicy maxRetryCount="int" retryDelay="int" maxRetryDelay="int" xdef:name="BatchRetryPolicyModel"
exponentialDelay="boolean=true" jitterRatio="double=0.3">
<exceptionFilter xdef:value="xpl-predicate"/>
</retryPolicy>

<skipPolicy maxSkipCount="int">
<skipPolicy maxSkipCount="int" xdef:name="BatchSkipPolicyModel">
<exceptionFilter xdef:value="xpl-predicate"/>
</skipPolicy>

<inputSorter xdef:ref="/nop/schema/query/order-by.xdef"/>

<reader bean="bean-name">
<file-reader pathExpr="expr">
<xdef:define xdef:name="BatchListenersModel">
<onTaskBegin xdef:value="xpl-fn:(ctx)=>void"/>
<onTaskEnd xdef:value="xpl-fn:(err,ctx)=>void"/>

<onChunkBegin xdef:value="xpl-fn:(ctx)=>void"/>
<onChunkEnd xdef:value="xpl-fn:(err,ctx)=>void"/>

<onLoadBegin xdef:value="xpl-fn:(batchSize,ctx)=>void"/>
<onLoadEnd xdef:value="xpl-fn:(err,batchSize,ctx)=>void"/>

<onConsumeBegin xdef:value="xpl-fn:(items,ctx)=>void"/>
<onConsumeEnd xdef:value="xpl-fn:(err,items,ctx)=>void"/>
</xdef:define>

<reader bean="bean-name" xdef:name="BatchReaderModel" xdef:ref="BatchListenersModel">
<file-reader pathExpr="expr" xdef:name="BatchFileReaderModel">
<headers xdef:value="csv-set"/>
</file-reader>

<orm-reader>
<orm-reader xdef:name="BatchOrmReaderModel" batchLoadProps="csv-list">
<eql xdef:value="xpl-sql"/>
<query xdef:ref="../query/query.xdef"/>
</orm-reader>

<jdbc-reader querySpace="string">
<jdbc-reader querySpace="string" sqlName="string" xdef:name="BatchJdbcReaderModel">
<sql xdef:value="xpl-sql"/>
<query xdef:ref="../query/query.xdef"/>
</jdbc-reader>

<source xdef:value="xpl"/>
</reader>

<processor bean="bean-name" id="var-name" xdef:unique-attr="id">
<filter xdef:value="xpl-predicate" />
<processor bean="bean-name" id="var-name" order="!int=0"
xdef:unique-attr="id" xdef:name="BatchProcessorModel" xdef:ref="BatchListenersModel">
<filter xdef:value="xpl-predicate"/>
<source xdef:value="xpl"/>
</processor>

<chunk-processor bean="bean-name">
<chunk-processor bean="bean-name" xdef:name="BatchChunkProcessorModel" xdef:ref="BatchListenersModel">
<source xdef:value="xpl"/>
</chunk-processor>

<writer bean="bean-name" id="var-name" xdef:unique-attr="id">
<source xdef:value="xpl" />
<writer bean="bean-name" id="var-name" order="!int=0"
xdef:unique-attr="id" xdef:name="BatchWriterModel" xdef:ref="BatchListenersModel">
<source xdef:value="xpl"/>
</writer>

</batch>

0 comments on commit e4a1560

Please sign in to comment.