diff --git a/nop-api-core/src/main/java/io/nop/api/core/ApiConstants.java b/nop-api-core/src/main/java/io/nop/api/core/ApiConstants.java index e8e5fa9a0..323d6fcf9 100644 --- a/nop-api-core/src/main/java/io/nop/api/core/ApiConstants.java +++ b/nop-api-core/src/main/java/io/nop/api/core/ApiConstants.java @@ -113,6 +113,12 @@ public interface ApiConstants extends FilterBeanConstants { String HEADER_SVC_NAME = "nop-svc-name"; + String HEADER_TOPIC = "nop-topic"; + + String HEADER_EVENT_TIME = "nop-event-time"; + + String HEADER_PROCESS_TIME = "nop-process-time"; + /** * 业务对象上执行的方法 */ diff --git a/nop-api-core/src/main/java/io/nop/api/core/util/ApiHeaders.java b/nop-api-core/src/main/java/io/nop/api/core/util/ApiHeaders.java index 72aa903eb..4d95b2e88 100644 --- a/nop-api-core/src/main/java/io/nop/api/core/util/ApiHeaders.java +++ b/nop-api-core/src/main/java/io/nop/api/core/util/ApiHeaders.java @@ -13,6 +13,7 @@ import io.nop.api.core.convert.ConvertHelper; import io.nop.api.core.exceptions.NopException; +import java.sql.Timestamp; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -316,6 +317,30 @@ public static void setSvcName(ApiMessage message, String value) { message.setHeader(ApiConstants.HEADER_SVC_NAME, value); } + public static String getTopic(ApiMessage message) { + return getStringHeader(message.getHeaders(), ApiConstants.HEADER_TOPIC); + } + + public static void setTopic(ApiMessage message, String value) { + message.setHeader(ApiConstants.HEADER_TOPIC, value); + } + + public static Timestamp getEventTime(ApiMessage message) { + return ConvertHelper.toTimestamp(getHeader(message.getHeaders(), ApiConstants.HEADER_EVENT_TIME)); + } + + public static void setEventTime(ApiMessage message, Timestamp value) { + message.setHeader(ApiConstants.HEADER_EVENT_TIME, ConvertHelper.toLong(value)); + } + + public static Timestamp getProcessTime(ApiMessage message) { + return ConvertHelper.toTimestamp(getHeader(message.getHeaders(), ApiConstants.HEADER_PROCESS_TIME)); + } + + public static void setProcessTime(ApiMessage message, Timestamp value) { + message.setHeader(ApiConstants.HEADER_PROCESS_TIME, ConvertHelper.toLong(value)); + } + public static String getSvcAction(ApiMessage message) { return getStringHeader(message.getHeaders(), ApiConstants.HEADER_SVC_ACTION); } diff --git a/nop-commons/src/main/java/io/nop/commons/type/StdSqlType.java b/nop-commons/src/main/java/io/nop/commons/type/StdSqlType.java index c868b0c25..445d0c787 100644 --- a/nop-commons/src/main/java/io/nop/commons/type/StdSqlType.java +++ b/nop-commons/src/main/java/io/nop/commons/type/StdSqlType.java @@ -88,7 +88,7 @@ public enum StdSqlType implements ISqlDataType { // ROW(PrecScale.NO_NO, false, Types.STRUCT, null), OTHER(false, false, Types.OTHER, StdDataType.ANY), - JSON(false, false, Types.JAVA_OBJECT, StdDataType.ANY), + JSON(false, false, Types.VARCHAR, StdDataType.STRING), ANY(false, false, Types.JAVA_OBJECT, StdDataType.ANY), // CURSOR(PrecScale.NO_NO, false, ExtraSqlTypes.REF_CURSOR, diff --git a/nop-demo/nop-quarkus-demo/src/main/resources/META-INF/native-image/io.nop.demo/nop-quarkus-demo/reflect-config.json b/nop-demo/nop-quarkus-demo/src/main/resources/META-INF/native-image/io.nop.demo/nop-quarkus-demo/reflect-config.json index 0889c6964..b11e22017 100644 --- a/nop-demo/nop-quarkus-demo/src/main/resources/META-INF/native-image/io.nop.demo/nop-quarkus-demo/reflect-config.json +++ b/nop-demo/nop-quarkus-demo/src/main/resources/META-INF/native-image/io.nop.demo/nop-quarkus-demo/reflect-config.json @@ -2864,6 +2864,16 @@ "methods": [], "name": "io.nop.commons.concurrent.executor.ThreadPoolConfig" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.commons.functional.ChainedAsyncFunctionInvoker" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -6309,6 +6319,26 @@ "methods": [], "name": "io.nop.graphql.core.IBizModelImpl" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.graphql.core.audit.DefaultGraphQLAuditer" + }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.graphql.core.audit.GraphQLAuditOperationInvoker" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -6366,6 +6396,16 @@ ], "name": "io.nop.graphql.core.engine.GraphQLEngine" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.graphql.core.engine.GraphQLTransactionOperationInvoker" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -9671,6 +9711,16 @@ "methods": [], "name": "io.nop.sys.dao.entity.NopSysSequence" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.sys.dao.entity.NopSysServiceInstance" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -9856,6 +9906,21 @@ ], "name": "io.nop.sys.dao.entity._gen._NopSysSequence" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [ + { + "name": "newInstance", + "parameterTypes": [] + } + ], + "name": "io.nop.sys.dao.entity._gen._NopSysServiceInstance" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -10076,6 +10141,16 @@ "methods": [], "name": "io.nop.sys.service.entity.NopSysSequenceBizModel" }, + { + "allDeclaredConstructors": true, + "allDeclaredMethods": true, + "allPublicConstructors": true, + "allPublicFields": true, + "allPublicMethods": true, + "fields": [], + "methods": [], + "name": "io.nop.sys.service.entity.NopSysServiceInstanceBizModel" + }, { "allDeclaredConstructors": true, "allDeclaredMethods": true, @@ -10504,7 +10579,14 @@ "allPublicFields": true, "allPublicMethods": true, "fields": [], - "methods": [], + "methods": [ + { + "name": "transformPermissions", + "parameterTypes": [ + "java.lang.Object" + ] + } + ], "name": "io.nop.web.page.PageProvider" }, { diff --git a/nop-demo/nop-quarkus-demo/src/main/resources/nop-vfs-index.txt b/nop-demo/nop-quarkus-demo/src/main/resources/nop-vfs-index.txt index f6285eefe..cdd4eccf3 100644 --- a/nop-demo/nop-quarkus-demo/src/main/resources/nop-vfs-index.txt +++ b/nop-demo/nop-quarkus-demo/src/main/resources/nop-vfs-index.txt @@ -320,6 +320,7 @@ /nop/core/xlib/biz!when.xlib /nop/core/xlib/biz-gen.xlib /nop/core/xlib/biz.xlib +/nop/core/xlib/dsl-gen.xlib /nop/core/xlib/file.xlib /nop/core/xlib/filter.xlib /nop/core/xlib/meta-gen.xlib @@ -854,6 +855,10 @@ /nop/sys/model/NopSysSequence/NopSysSequence.xmeta /nop/sys/model/NopSysSequence/_NopSysSequence.xbiz /nop/sys/model/NopSysSequence/_NopSysSequence.xmeta +/nop/sys/model/NopSysServiceInstance/NopSysServiceInstance.xbiz +/nop/sys/model/NopSysServiceInstance/NopSysServiceInstance.xmeta +/nop/sys/model/NopSysServiceInstance/_NopSysServiceInstance.xbiz +/nop/sys/model/NopSysServiceInstance/_NopSysServiceInstance.xmeta /nop/sys/model/NopSysUserVariable/NopSysUserVariable.xbiz /nop/sys/model/NopSysUserVariable/NopSysUserVariable.xmeta /nop/sys/model/NopSysUserVariable/_NopSysUserVariable.xbiz @@ -929,6 +934,11 @@ /nop/sys/pages/NopSysSequence/_gen/_NopSysSequence.view.xml /nop/sys/pages/NopSysSequence/main.page.yaml /nop/sys/pages/NopSysSequence/picker.page.yaml +/nop/sys/pages/NopSysServiceInstance/NopSysServiceInstance.lib.xjs +/nop/sys/pages/NopSysServiceInstance/NopSysServiceInstance.view.xml +/nop/sys/pages/NopSysServiceInstance/_gen/_NopSysServiceInstance.view.xml +/nop/sys/pages/NopSysServiceInstance/main.page.yaml +/nop/sys/pages/NopSysServiceInstance/picker.page.yaml /nop/sys/pages/NopSysUserVariable/NopSysUserVariable.lib.js /nop/sys/pages/NopSysUserVariable/NopSysUserVariable.lib.xjs /nop/sys/pages/NopSysUserVariable/NopSysUserVariable.view.xml diff --git a/nop-dyn/nop-dyn-service/src/test/java/io/nop/dyn/service/codegen/TestDynCodeGen.java b/nop-dyn/nop-dyn-service/src/test/java/io/nop/dyn/service/codegen/TestDynCodeGen.java index fa452a6f2..0eb63ed29 100644 --- a/nop-dyn/nop-dyn-service/src/test/java/io/nop/dyn/service/codegen/TestDynCodeGen.java +++ b/nop-dyn/nop-dyn-service/src/test/java/io/nop/dyn/service/codegen/TestDynCodeGen.java @@ -79,6 +79,8 @@ public void testGen() { codeGen.reloadModel(); }); + // bizObjectManager.getBizObject("MyDynEntity_one"); + ormTemplate.runInSession(() -> { IEntityDao dao = daoProvider.dao("MyDynEntity"); diff --git a/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/core/xlib/meta-reload.xlib b/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/core/xlib/meta-reload.xlib new file mode 100644 index 000000000..efeb5b56a --- /dev/null +++ b/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/core/xlib/meta-reload.xlib @@ -0,0 +1,50 @@ + + + + + + + + + + + logInfo("DefaultM2222___{}", _dsl_root) + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/templates/dyn/{moduleId}/model/{entityMeta.bizObjName}/{!!entityModel}{entityModel.shortName}_one.xmeta.xgen b/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/templates/dyn/{moduleId}/model/{entityMeta.bizObjName}/{!!entityModel}{entityModel.shortName}_one.xmeta.xgen new file mode 100644 index 000000000..657391587 --- /dev/null +++ b/nop-dyn/nop-dyn-service/src/test/resources/_vfs/_delta/default/nop/templates/dyn/{moduleId}/model/{entityMeta.bizObjName}/{!!entityModel}{entityModel.shortName}_one.xmeta.xgen @@ -0,0 +1,26 @@ + + ${entityModel.name} + + ${_.join(entityModel.pkColumnNames,',')} + const dispCol = entityModel.getColumnByTag('disp') + ${dispCol.name} + + + + + + + + + + + + + + + + + + + + diff --git a/nop-message/nop-message-core/src/main/java/io/nop/message/core/MessageCoreConstants.java b/nop-message/nop-message-core/src/main/java/io/nop/message/core/MessageCoreConstants.java index af03d090c..ef29c931e 100644 --- a/nop-message/nop-message-core/src/main/java/io/nop/message/core/MessageCoreConstants.java +++ b/nop-message/nop-message-core/src/main/java/io/nop/message/core/MessageCoreConstants.java @@ -2,4 +2,8 @@ public interface MessageCoreConstants { String BEAN_LOCAL_MESSAGE_SERVICE = "nopLocalMessageService"; + + String TOPIC_PREFIX_BROADCAST = "bro-"; + String TOPIC_PREFIX_BATCH = "bat-"; + String TOPIC_PREFIX_REPLY = "reply-"; } diff --git a/nop-message/nop-message-core/src/main/java/io/nop/message/core/local/LocalMessageService.java b/nop-message/nop-message-core/src/main/java/io/nop/message/core/local/LocalMessageService.java index 48e996e09..389c0ee84 100644 --- a/nop-message/nop-message-core/src/main/java/io/nop/message/core/local/LocalMessageService.java +++ b/nop-message/nop-message-core/src/main/java/io/nop/message/core/local/LocalMessageService.java @@ -9,11 +9,14 @@ import io.nop.api.core.message.MessageSubscribeOptions; import io.nop.api.core.util.FutureHelper; import io.nop.api.core.util.Guard; +import io.nop.message.core.MessageCoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -117,6 +120,26 @@ private Subscription findSubscription(List subscriptions, IMessage return null; } + public Set getBroadcastTopics() { + Set ret = new TreeSet<>(); + for (String topic : consumers.keySet()) { + if (topic.startsWith(MessageCoreConstants.TOPIC_PREFIX_BROADCAST)) { + ret.add(topic); + } + } + return ret; + } + + public Set getNonBroadcastTopics() { + Set ret = new TreeSet<>(); + for (String topic : consumers.keySet()) { + if (!topic.startsWith(MessageCoreConstants.TOPIC_PREFIX_BROADCAST)) { + ret.add(topic); + } + } + return ret; + } + @Override public IMessageSubscription subscribe(String topic, IMessageConsumer listener, MessageSubscribeOptions options) { Guard.notEmpty(topic, "topic"); diff --git a/nop-sys/model/nop-sys.orm.xlsx b/nop-sys/model/nop-sys.orm.xlsx index 842fe61bb..85e2ddb4a 100644 Binary files a/nop-sys/model/nop-sys.orm.xlsx and b/nop-sys/model/nop-sys.orm.xlsx differ diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/NopSysDaoConstants.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/NopSysDaoConstants.java index 87b4404c3..ac03d4e71 100644 --- a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/NopSysDaoConstants.java +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/NopSysDaoConstants.java @@ -16,7 +16,5 @@ public interface NopSysDaoConstants extends _NopSysDaoConstants { String RESOURCE_GROUP_DEFAULT = "default"; - String TOPIC_PREFIX_BROADCAST = "bro-"; - String TOPIC_PREFIX_BATCH = "bat-"; - String TOPIC_PREFIX_REPLY = "reply-"; + } diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/NopSysEvent.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/NopSysEvent.java index 99ad7001f..b8f9b6d68 100644 --- a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/NopSysEvent.java +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/NopSysEvent.java @@ -5,7 +5,9 @@ @BizObjName("NopSysEvent") -public class NopSysEvent extends _NopSysEvent{ +public class NopSysEvent extends _NopSysEvent { + public NopSysEvent() { + } } diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/_gen/_NopSysEvent.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/_gen/_NopSysEvent.java index e31e0b194..9c9289b0a 100644 --- a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/_gen/_NopSysEvent.java +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/entity/_gen/_NopSysEvent.java @@ -192,10 +192,10 @@ public class _NopSysEvent extends DynamicOrmEntity{ private java.lang.String _eventName; /* 事件元数据: EVENT_HEADERS */ - private java.lang.Object _eventHeaders; + private java.lang.String _eventHeaders; /* 数据: EVENT_DATA */ - private java.lang.Object _eventData; + private java.lang.String _eventData; /* 字段选择: SELECTION */ private java.lang.String _selection; @@ -424,9 +424,9 @@ public void orm_propValue(int propId, Object value){ } case PROP_ID_eventHeaders:{ - java.lang.Object typedValue = null; + java.lang.String typedValue = null; if(value != null){ - typedValue = ConvertHelper.toObject(value, + typedValue = ConvertHelper.toString(value, err-> newTypeConversionError(PROP_NAME_eventHeaders)); } setEventHeaders(typedValue); @@ -434,9 +434,9 @@ public void orm_propValue(int propId, Object value){ } case PROP_ID_eventData:{ - java.lang.Object typedValue = null; + java.lang.String typedValue = null; if(value != null){ - typedValue = ConvertHelper.toObject(value, + typedValue = ConvertHelper.toString(value, err-> newTypeConversionError(PROP_NAME_eventData)); } setEventData(typedValue); @@ -635,14 +635,14 @@ public void orm_internalSet(int propId, Object value) { case PROP_ID_eventHeaders:{ onInitProp(propId); - this._eventHeaders = (java.lang.Object)value; + this._eventHeaders = (java.lang.String)value; break; } case PROP_ID_eventData:{ onInitProp(propId); - this._eventData = (java.lang.Object)value; + this._eventData = (java.lang.String)value; break; } @@ -825,7 +825,7 @@ public final void setEventName(java.lang.String value){ /** * 事件元数据: EVENT_HEADERS */ - public final java.lang.Object getEventHeaders(){ + public final java.lang.String getEventHeaders(){ onPropGet(PROP_ID_eventHeaders); return _eventHeaders; } @@ -833,7 +833,7 @@ public final java.lang.Object getEventHeaders(){ /** * 事件元数据: EVENT_HEADERS */ - public final void setEventHeaders(java.lang.Object value){ + public final void setEventHeaders(java.lang.String value){ if(onPropSet(PROP_ID_eventHeaders,value)){ this._eventHeaders = value; internalClearRefs(PROP_ID_eventHeaders); @@ -844,7 +844,7 @@ public final void setEventHeaders(java.lang.Object value){ /** * 数据: EVENT_DATA */ - public final java.lang.Object getEventData(){ + public final java.lang.String getEventData(){ onPropGet(PROP_ID_eventData); return _eventData; } @@ -852,7 +852,7 @@ public final java.lang.Object getEventData(){ /** * 数据: EVENT_DATA */ - public final void setEventData(java.lang.Object value){ + public final void setEventData(java.lang.String value){ if(onPropSet(PROP_ID_eventData,value)){ this._eventData = value; internalClearRefs(PROP_ID_eventData); diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysDaoMessageService.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysDaoMessageService.java index b0807faf4..ec8e20ed8 100644 --- a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysDaoMessageService.java +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysDaoMessageService.java @@ -1,23 +1,21 @@ package io.nop.sys.dao.message; import io.nop.api.core.beans.ApiRequest; +import io.nop.api.core.beans.FilterBeans; +import io.nop.api.core.beans.query.QueryBean; import io.nop.api.core.message.IMessageConsumer; import io.nop.api.core.message.IMessageService; import io.nop.api.core.message.IMessageSubscription; import io.nop.api.core.message.MessageSendOptions; import io.nop.api.core.message.MessageSubscribeOptions; import io.nop.api.core.message.TopicMessage; -import io.nop.api.core.time.CoreMetrics; -import io.nop.api.core.util.ApiHeaders; +import io.nop.api.core.time.IEstimatedClock; import io.nop.api.core.util.FutureHelper; import io.nop.commons.concurrent.executor.GlobalExecutors; import io.nop.commons.concurrent.executor.IScheduledExecutor; import io.nop.commons.concurrent.executor.IThreadPoolExecutor; import io.nop.commons.service.LifeCycleSupport; -import io.nop.commons.util.DateHelper; import io.nop.commons.util.MathHelper; -import io.nop.commons.util.StringHelper; -import io.nop.core.lang.json.JsonTool; import io.nop.dao.api.IDaoProvider; import io.nop.dao.api.IEntityDao; import io.nop.message.core.local.LocalMessageService; @@ -25,8 +23,16 @@ import jakarta.inject.Inject; import java.sql.Timestamp; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class SysDaoMessageService extends LifeCycleSupport implements IMessageService { private IDaoProvider daoProvider; @@ -35,6 +41,18 @@ public class SysDaoMessageService extends LifeCycleSupport implements IMessageSe private IThreadPoolExecutor executor; + private Duration checkInterval = Duration.of(500, ChronoUnit.MILLIS); + + private int fetchSize = 100; + + private int startGap = 5000; + + private Timestamp startTime; + + private Future checkFuture; + + private NopSysEvent lastBroadcastEvent; + private LocalMessageService localService = new LocalMessageService() { @Override public void send(String topic, Object message, MessageSendOptions options) { @@ -42,6 +60,18 @@ public void send(String topic, Object message, MessageSendOptions options) { } }; + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + public void setStartGap(int startGap) { + this.startGap = startGap; + } + + public void setCheckInterval(Duration checkInterval) { + this.checkInterval = checkInterval; + } + public void setExecutor(IThreadPoolExecutor executor) { this.executor = executor; } @@ -62,61 +92,70 @@ public void doStart() { timer = GlobalExecutors.globalTimer(); if (executor == null) executor = GlobalExecutors.globalWorker(); + + IEstimatedClock clock = dao().getDbEstimatedClock(); + startTime = new Timestamp(clock.getMinCurrentTimeMillis() - startGap); + + checkFuture = timer.executeOn(executor).scheduleWithFixedDelay(this::processBroadcastEvent, + checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); } @Override public void doStop() { + if (checkFuture != null) + checkFuture.cancel(false); } - protected IEntityDao dao() { - return daoProvider.daoFor(NopSysEvent.class); - } + protected void processBroadcastEvent() { + do { + List events = fetchBroadcastEvents(); + if (events.isEmpty()) + break; - protected void toEvent(NopSysEvent event, String topic, Object message, long eventTime) { - event.setEventTopic(topic); - event.setEventStatus(0); - event.setEventTime(new Timestamp(eventTime)); - event.setBizDate(DateHelper.millisToDate(eventTime)); - event.setEventName(message.getClass().getSimpleName()); - event.setProcessTime(event.getEventTime()); - - if (message instanceof ApiRequest) { - ApiRequest request = (ApiRequest) message; - String bizKey = ApiHeaders.getBizKey(request); - String bizObjName = ApiHeaders.getSvcName(request); - event.setBizKey(bizKey); - event.setBizObjName(bizObjName); - if (bizKey != null) { - event.setPartitionIndex((int) StringHelper.shortHash(bizObjName + '|' + bizKey)); - } - String svcAction = ApiHeaders.getSvcAction(request); - if (svcAction != null) { - event.setEventName(svcAction); - - if (bizObjName == null) { - int pos = svcAction.indexOf('_'); - if (pos > 0) { - bizObjName = svcAction.substring(0, pos); - event.setBizObjName(bizObjName); - } - } + for (NopSysEvent event : events) { + ApiRequest> request = fromSysEvent(event); + localService.invokeMessageListener(event.getEventTopic(), request, null); } - if (request.getHeaders() != null) - event.setEventHeaders(JsonTool.stringify(request.getHeaders())); - if (request.getSelection() != null) { - event.setSelection(request.getSelection().toString()); - } - if (request.getData() != null) - event.setEventData(JsonTool.stringify(request.getData())); + } while (true); + } + + protected List fetchBroadcastEvents() { + Set topics = getBroadcastTopics(); + if (topics.isEmpty()) + return Collections.emptyList(); + + // 按照eventId从小到大处理 + IEntityDao dao = dao(); + QueryBean query = new QueryBean(); + query.addFilter(FilterBeans.in(NopSysEvent.PROP_NAME_eventTopic, topics)); + query.addFilter(FilterBeans.gt(NopSysEvent.PROP_NAME_eventTime, startTime)); + query.setFilter(FilterBeans.eq(NopSysEvent.PROP_NAME_isBroadcast, true)); + query.addOrderField(NopSysEvent.PROP_NAME_eventId, true); + + List list = dao.findNext(lastBroadcastEvent, query.getFilter(), query.getOrderBy(), fetchSize); + if (!list.isEmpty()) { + lastBroadcastEvent = list.get(list.size() - 1); } + return list; + } + + protected Set getBroadcastTopics() { + return localService.getBroadcastTopics(); + } + + protected IEntityDao dao() { + return daoProvider.daoFor(NopSysEvent.class); } + @Override public CompletionStage sendAsync(String topic, Object message, MessageSendOptions options) { IEntityDao dao = dao(); + IEstimatedClock clock = dao.getDbEstimatedClock(); + NopSysEvent event = dao.newEntity(); event.setPartitionIndex(MathHelper.random().nextInt(Short.MAX_VALUE)); - toEvent(event, topic, message, CoreMetrics.currentTimeMillis()); + toSysEvent(event, topic, message, clock.getMaxCurrentTimeMillis()); try { dao.saveEntityDirectly(event); @@ -129,12 +168,14 @@ public CompletionStage sendAsync(String topic, Object message, MessageSend @Override public CompletionStage sendMultiAsync(Collection messages, MessageSendOptions options) { IEntityDao dao = dao(); - long currentTime = CoreMetrics.currentTimeMillis(); + IEstimatedClock clock = dao.getDbEstimatedClock(); + + long currentTime = clock.getMaxCurrentTimeMillis(); try { for (TopicMessage message : messages) { NopSysEvent event = dao.newEntity(); event.setPartitionIndex(MathHelper.random().nextInt(Short.MAX_VALUE)); - toEvent(event, message.getTopic(), message.getMessage(), currentTime); + toSysEvent(event, message.getTopic(), message.getMessage(), currentTime); dao.saveEntityDirectly(event); } } catch (Exception e) { @@ -143,6 +184,14 @@ public CompletionStage sendMultiAsync(Collection messages, M return FutureHelper.success(null); } + protected ApiRequest> fromSysEvent(NopSysEvent event) { + return SysEventHelper.fromSysEvent(event); + } + + protected void toSysEvent(NopSysEvent event, String topic, Object message, long eventTime) { + SysEventHelper.toSysEvent(event, topic, message, eventTime); + } + @Override public IMessageSubscription subscribe(String topic, IMessageConsumer listener, MessageSubscribeOptions options) { return localService.subscribe(topic, listener, options); diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysEventHelper.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysEventHelper.java new file mode 100644 index 000000000..1706d43ad --- /dev/null +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/message/SysEventHelper.java @@ -0,0 +1,70 @@ +package io.nop.sys.dao.message; + +import io.nop.api.core.beans.ApiRequest; +import io.nop.api.core.util.ApiHeaders; +import io.nop.commons.util.DateHelper; +import io.nop.commons.util.StringHelper; +import io.nop.core.lang.json.JsonTool; +import io.nop.core.model.selection.FieldSelectionBeanParser; +import io.nop.sys.dao.entity.NopSysEvent; + +import java.sql.Timestamp; +import java.util.Map; + +import static io.nop.core.lang.json.JsonTool.parseMap; +import static io.nop.message.core.MessageCoreConstants.TOPIC_PREFIX_BROADCAST; + +public class SysEventHelper { + public static ApiRequest> fromSysEvent(NopSysEvent event) { + ApiRequest> request = new ApiRequest<>(); + request.setSelection(FieldSelectionBeanParser.fromText(null, event.getSelection())); + request.setHeaders(parseMap(event.getEventHeaders())); + request.setData(parseMap(event.getEventData())); + ApiHeaders.setBizKey(request, event.getBizKey()); + ApiHeaders.setSvcName(request, event.getBizObjName()); + ApiHeaders.setTopic(request, event.getEventTopic()); + ApiHeaders.setEventTime(request, event.getEventTime()); + ApiHeaders.setProcessTime(request, event.getProcessTime()); + return request; + } + + public static void toSysEvent(NopSysEvent event, String topic, Object message, long eventTime) { + event.setEventTopic(topic); + event.setEventStatus(0); + event.setEventTime(new Timestamp(eventTime)); + event.setBizDate(DateHelper.millisToDate(eventTime)); + event.setEventName(message.getClass().getSimpleName()); + event.setProcessTime(event.getEventTime()); + event.setIsBroadcast(topic.startsWith(TOPIC_PREFIX_BROADCAST)); + + if (message instanceof ApiRequest) { + ApiRequest request = (ApiRequest) message; + String bizKey = ApiHeaders.getBizKey(request); + String bizObjName = ApiHeaders.getSvcName(request); + event.setBizKey(bizKey); + event.setBizObjName(bizObjName); + if (bizKey != null) { + event.setPartitionIndex((int) StringHelper.shortHash(bizObjName + '|' + bizKey)); + } + String svcAction = ApiHeaders.getSvcAction(request); + if (svcAction != null) { + event.setEventName(svcAction); + + if (bizObjName == null) { + int pos = svcAction.indexOf('_'); + if (pos > 0) { + bizObjName = svcAction.substring(0, pos); + event.setBizObjName(bizObjName); + } + } + } + if (request.getHeaders() != null) + event.setEventHeaders(JsonTool.stringify(request.getHeaders())); + if (request.getSelection() != null) { + event.setSelection(request.getSelection().toString()); + } + if (request.getData() != null) + event.setEventData(JsonTool.stringify(request.getData())); + } + } +} diff --git a/nop-sys/nop-sys-dao/src/main/resources/_vfs/nop/sys/orm/_app.orm.xml b/nop-sys/nop-sys-dao/src/main/resources/_vfs/nop/sys/orm/_app.orm.xml index aa2a0883a..d6e03c1d9 100644 --- a/nop-sys/nop-sys-dao/src/main/resources/_vfs/nop/sys/orm/_app.orm.xml +++ b/nop-sys/nop-sys-dao/src/main/resources/_vfs/nop/sys/orm/_app.orm.xml @@ -497,9 +497,9 @@ + propId="4" stdDataType="string" stdSqlType="JSON" i18n-en:displayName="Headers"/> + stdDataType="string" stdSqlType="JSON" i18n-en:displayName="Data"/> - + - +