From 060340a40974bc20ec81419a300da26bfc66551d Mon Sep 17 00:00:00 2001 From: canonical Date: Mon, 20 Jan 2025 00:18:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0SysDaoNamingService?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nop/cluster/naming/AutoRegistration.java | 40 ++++- .../cluster/naming/CachingNamingService.java | 60 +++++++ .../io/nop/cluster/naming/INamingService.java | 4 + .../sys/dao/naming/SysDaoNamingService.java | 164 ++++++++++++++++++ 4 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/CachingNamingService.java create mode 100644 nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/naming/SysDaoNamingService.java diff --git a/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/AutoRegistration.java b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/AutoRegistration.java index 161a6eef8..d8111f39f 100644 --- a/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/AutoRegistration.java +++ b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/AutoRegistration.java @@ -11,14 +11,18 @@ import io.nop.api.core.config.AppConfig; import io.nop.api.core.exceptions.NopException; import io.nop.cluster.discovery.ServiceInstance; +import io.nop.commons.concurrent.executor.GlobalExecutors; import io.nop.commons.io.net.IServerAddrFinder; import io.nop.commons.util.StringHelper; - import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; + +import java.time.Duration; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static io.nop.cluster.ClusterErrors.ARG_VERSION; import static io.nop.cluster.ClusterErrors.ERR_CLUSTER_APP_VERSION_MUST_BE_NPM_LIKE; @@ -43,6 +47,12 @@ public class AutoRegistration { private ServiceInstance instance; + private Duration autoUpdateInterval; + + private Future autoUpdateTimerFuture; + + private boolean ephemeral = true; + public AutoRegistration(INamingService namingService) { this.namingService = namingService; } @@ -60,6 +70,14 @@ public void setAddrFinder(IServerAddrFinder addrFinder) { this.addrFinder = addrFinder; } + public boolean isEphemeral() { + return ephemeral; + } + + public void setEphemeral(boolean ephemeral) { + this.ephemeral = ephemeral; + } + public String getServiceName() { return serviceName; } @@ -116,10 +134,14 @@ public void setMetadata(Map metadata) { this.metadata = metadata; } + public void setAutoUpdateInterval(Duration autoUpdateInterval) { + this.autoUpdateInterval = autoUpdateInterval; + } + protected ServiceInstance getServiceInstance() { ServiceInstance instance = new ServiceInstance(); instance.setEnabled(true); - instance.setEphemeral(true); + instance.setEphemeral(ephemeral); String serviceName = this.serviceName; if (serviceName == null) @@ -156,10 +178,24 @@ public void start() { } } namingService.registerInstance(getServiceInstance()); + + if (autoUpdateInterval != null && autoUpdateInterval.toMillis() > 0) { + autoUpdateTimerFuture = GlobalExecutors.globalTimer().executeOn(GlobalExecutors.globalWorker()) + .scheduleWithFixedDelay(this::refreshRegistration, autoUpdateInterval.toMillis(), autoUpdateInterval.toMillis(), TimeUnit.MILLISECONDS); + } + } + + protected void refreshRegistration() { + if (instance != null) { + namingService.updateInstance(getServiceInstance()); + } } @PreDestroy public void stop() { + if (autoUpdateTimerFuture != null) + autoUpdateTimerFuture.cancel(false); + autoUpdateTimerFuture = null; if (instance == null) return; namingService.unregisterInstance(instance); diff --git a/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/CachingNamingService.java b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/CachingNamingService.java new file mode 100644 index 000000000..e82d35626 --- /dev/null +++ b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/CachingNamingService.java @@ -0,0 +1,60 @@ +package io.nop.cluster.naming; + +import io.nop.api.core.util.FutureHelper; +import io.nop.cluster.discovery.ServiceInstance; +import io.nop.commons.cache.CacheConfig; +import io.nop.commons.cache.LocalCache; + +import java.util.List; +import java.util.concurrent.CompletionStage; + +public class CachingNamingService implements INamingService { + private final INamingService service; + private final LocalCache> cache; + + public CachingNamingService(INamingService service, long cacheTimeout) { + this.service = service; + this.cache = LocalCache.newCache("naming-service-cache", CacheConfig.newConfig(1000, cacheTimeout)); + } + + @Override + public void registerInstance(ServiceInstance instance) { + service.registerInstance(instance); + } + + @Override + public void unregisterInstance(ServiceInstance instance) { + service.unregisterInstance(instance); + } + + @Override + public List getServices() { + return service.getServices(); + } + + @Override + public List getInstances(String serviceName) { + return cache.computeIfAbsent(serviceName, k -> service.getInstances(serviceName)); + } + + @Override + public void updateInstance(ServiceInstance instance) { + service.updateInstance(instance); + } + + @Override + public CompletionStage> getInstancesAsync(String serviceName) { + List instances = cache.get(serviceName); + if (instances != null) + return FutureHelper.success(instances); + return service.getInstancesAsync(serviceName).thenApply(ret -> { + cache.put(serviceName, ret); + return ret; + }); + } + + @Override + public int order() { + return service.order(); + } +} \ No newline at end of file diff --git a/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/INamingService.java b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/INamingService.java index 84bcdbec4..935dcd4f9 100644 --- a/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/INamingService.java +++ b/nop-cluster/nop-cluster-core/src/main/java/io/nop/cluster/naming/INamingService.java @@ -20,6 +20,10 @@ public interface INamingService extends IDiscoveryClient { void unregisterInstance(ServiceInstance instance); + default void updateInstance(ServiceInstance instance) { + + } + List getServices(); List getInstances(String serviceName); diff --git a/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/naming/SysDaoNamingService.java b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/naming/SysDaoNamingService.java new file mode 100644 index 000000000..9756594d8 --- /dev/null +++ b/nop-sys/nop-sys-dao/src/main/java/io/nop/sys/dao/naming/SysDaoNamingService.java @@ -0,0 +1,164 @@ +package io.nop.sys.dao.naming; + +import io.nop.api.core.beans.FilterBeans; +import io.nop.api.core.beans.query.QueryBean; +import io.nop.api.core.beans.query.QueryFieldBean; +import io.nop.api.core.time.IEstimatedClock; +import io.nop.cluster.discovery.ServiceInstance; +import io.nop.cluster.naming.INamingService; +import io.nop.commons.concurrent.executor.GlobalExecutors; +import io.nop.commons.util.TagsHelper; +import io.nop.core.lang.json.JsonTool; +import io.nop.dao.api.IDaoProvider; +import io.nop.dao.api.IEntityDao; +import io.nop.sys.dao.entity.NopSysServiceInstance; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.inject.Inject; + +import java.sql.Timestamp; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class SysDaoNamingService implements INamingService { + + private IDaoProvider daoProvider; + private Duration autoUpdateInterval; + private Duration cleanupInterval; + private Future cleanupFuture; + + public void setAutoUpdateInterval(Duration autoUpdateInterval) { + this.autoUpdateInterval = autoUpdateInterval; + } + + public void setCleanupInterval(Duration cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + @Inject + public void setDaoProvider(IDaoProvider daoProvider) { + this.daoProvider = daoProvider; + } + + IEntityDao dao() { + return daoProvider.daoFor(NopSysServiceInstance.class); + } + + long getMaxUpdateInterval() { + if (autoUpdateInterval != null) + return autoUpdateInterval.toMillis() + 1000; + return 60 * 1000; + } + + @PostConstruct + public void init() { + if (cleanupInterval != null) { + cleanupFuture = GlobalExecutors.globalTimer().scheduleWithFixedDelay(this::cleanup, cleanupInterval.toMillis(), cleanupInterval.toMillis(), TimeUnit.MILLISECONDS); + } + } + + @PreDestroy + public void destroy() { + if (cleanupFuture != null) { + cleanupFuture.cancel(false); + cleanupFuture = null; + } + } + + void cleanup() { + IEntityDao dao = dao(); + QueryBean query = new QueryBean(); + query.addFilter(FilterBeans.lt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(System.currentTimeMillis() - 2 * getMaxUpdateInterval()))); + query.addFilter(FilterBeans.eq(NopSysServiceInstance.PROP_NAME_isEphemeral, true)); + dao.deleteByQuery(query); + } + + @Override + public void registerInstance(ServiceInstance instance) { + IEntityDao dao = dao(); + IEstimatedClock clock = dao.getDbEstimatedClock(); + NopSysServiceInstance entity = dao.getEntityById(instance.getInstanceId()); + if (entity != null) { + copyToEntity(entity, instance); + entity.setUpdateTime(clock.getMaxCurrentTime()); + dao.updateEntity(entity); + } else { + entity = toEntity(instance); + dao.saveEntity(entity); + } + } + + @Override + public void updateInstance(ServiceInstance instance) { + registerInstance(instance); + } + + @Override + public void unregisterInstance(ServiceInstance instance) { + dao().deleteEntityById(instance.getInstanceId()); + } + + @Override + public List getServices() { + IEntityDao dao = dao(); + IEstimatedClock clock = dao.getDbEstimatedClock(); + QueryBean query = new QueryBean(); + query.distinct().addField(QueryFieldBean.forField(NopSysServiceInstance.PROP_NAME_serviceName)); + query.addFilter(FilterBeans.gt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(clock.getMinCurrentTimeMillis() - getMaxUpdateInterval()))); + return dao.selectStringFieldByQuery(query); + } + + @Override + public List getInstances(String serviceName) { + QueryBean query = new QueryBean(); + query.addFilter(FilterBeans.eq(NopSysServiceInstance.PROP_NAME_serviceName, serviceName)); + + // 如果长时间没有更新,则认为服务实例已经失效 + query.addFilter(FilterBeans.gt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(System.currentTimeMillis() - getMaxUpdateInterval()))); + return dao().findAllByQuery(query).stream().map(this::fromEntity).collect(Collectors.toList()); + } + + protected NopSysServiceInstance toEntity(ServiceInstance instance) { + NopSysServiceInstance entity = new NopSysServiceInstance(); + copyToEntity(entity, instance); + return entity; + } + + protected void copyToEntity(NopSysServiceInstance entity, ServiceInstance instance) { + entity.setInstanceId(instance.getInstanceId()); + entity.setServiceName(instance.getServiceName()); + entity.setServerAddr(instance.getAddr()); + entity.setServerPort(instance.getPort()); + entity.setClusterName(instance.getClusterName()); + entity.setGroupName(instance.getGroupName()); + entity.setIsEnabled(instance.isEnabled()); + entity.setIsEphemeral(instance.isEphemeral()); + entity.setIsHealthy(instance.isHealthy()); + if (instance.getMetadata() != null) + entity.setMetaData(JsonTool.serialize(instance.getMetadata(), false)); + entity.setTagsText(TagsHelper.toString(instance.getTags())); + entity.setWeight(instance.getWeight()); + } + + protected ServiceInstance fromEntity(NopSysServiceInstance entity) { + ServiceInstance instance = new ServiceInstance(); + instance.setInstanceId(entity.getInstanceId()); + instance.setServiceName(entity.getServiceName()); + instance.setAddr(entity.getServerAddr()); + instance.setPort(entity.getServerPort()); + instance.setClusterName(entity.getClusterName()); + instance.setGroupName(entity.getGroupName()); + instance.setEnabled(entity.getIsEnabled()); + instance.setEphemeral(entity.getIsEphemeral()); + instance.setHealthy(entity.getIsHealthy()); + instance.setWeight(entity.getWeight()); + if (entity.getMetaData() != null) + instance.setMetadata((Map) JsonTool.parseNonStrict(entity.getMetaData())); + instance.setTags(TagsHelper.parse(entity.getTagsText(), ',')); + return instance; + } +} \ No newline at end of file