Skip to content

Commit

Permalink
增加SysDaoNamingService
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Jan 19, 2025
1 parent d1571cd commit 060340a
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
import io.nop.api.core.config.AppConfig;
import io.nop.api.core.exceptions.NopException;
import io.nop.cluster.discovery.ServiceInstance;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.io.net.IServerAddrFinder;
import io.nop.commons.util.StringHelper;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.nop.cluster.ClusterErrors.ARG_VERSION;
import static io.nop.cluster.ClusterErrors.ERR_CLUSTER_APP_VERSION_MUST_BE_NPM_LIKE;
Expand All @@ -43,6 +47,12 @@ public class AutoRegistration {

private ServiceInstance instance;

private Duration autoUpdateInterval;

private Future<?> autoUpdateTimerFuture;

private boolean ephemeral = true;

public AutoRegistration(INamingService namingService) {
this.namingService = namingService;
}
Expand All @@ -60,6 +70,14 @@ public void setAddrFinder(IServerAddrFinder addrFinder) {
this.addrFinder = addrFinder;
}

public boolean isEphemeral() {
return ephemeral;
}

public void setEphemeral(boolean ephemeral) {
this.ephemeral = ephemeral;
}

public String getServiceName() {
return serviceName;
}
Expand Down Expand Up @@ -116,10 +134,14 @@ public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

public void setAutoUpdateInterval(Duration autoUpdateInterval) {
this.autoUpdateInterval = autoUpdateInterval;
}

protected ServiceInstance getServiceInstance() {
ServiceInstance instance = new ServiceInstance();
instance.setEnabled(true);
instance.setEphemeral(true);
instance.setEphemeral(ephemeral);

String serviceName = this.serviceName;
if (serviceName == null)
Expand Down Expand Up @@ -156,10 +178,24 @@ public void start() {
}
}
namingService.registerInstance(getServiceInstance());

if (autoUpdateInterval != null && autoUpdateInterval.toMillis() > 0) {
autoUpdateTimerFuture = GlobalExecutors.globalTimer().executeOn(GlobalExecutors.globalWorker())
.scheduleWithFixedDelay(this::refreshRegistration, autoUpdateInterval.toMillis(), autoUpdateInterval.toMillis(), TimeUnit.MILLISECONDS);
}
}

protected void refreshRegistration() {
if (instance != null) {
namingService.updateInstance(getServiceInstance());
}
}

@PreDestroy
public void stop() {
if (autoUpdateTimerFuture != null)
autoUpdateTimerFuture.cancel(false);
autoUpdateTimerFuture = null;
if (instance == null)
return;
namingService.unregisterInstance(instance);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.nop.cluster.naming;

import io.nop.api.core.util.FutureHelper;
import io.nop.cluster.discovery.ServiceInstance;
import io.nop.commons.cache.CacheConfig;
import io.nop.commons.cache.LocalCache;

import java.util.List;
import java.util.concurrent.CompletionStage;

public class CachingNamingService implements INamingService {
private final INamingService service;
private final LocalCache<String, List<ServiceInstance>> cache;

public CachingNamingService(INamingService service, long cacheTimeout) {
this.service = service;
this.cache = LocalCache.newCache("naming-service-cache", CacheConfig.newConfig(1000, cacheTimeout));
}

@Override
public void registerInstance(ServiceInstance instance) {
service.registerInstance(instance);
}

@Override
public void unregisterInstance(ServiceInstance instance) {
service.unregisterInstance(instance);
}

@Override
public List<String> getServices() {
return service.getServices();
}

@Override
public List<ServiceInstance> getInstances(String serviceName) {
return cache.computeIfAbsent(serviceName, k -> service.getInstances(serviceName));
}

@Override
public void updateInstance(ServiceInstance instance) {
service.updateInstance(instance);
}

@Override
public CompletionStage<List<ServiceInstance>> getInstancesAsync(String serviceName) {
List<ServiceInstance> instances = cache.get(serviceName);
if (instances != null)
return FutureHelper.success(instances);
return service.getInstancesAsync(serviceName).thenApply(ret -> {
cache.put(serviceName, ret);
return ret;
});
}

@Override
public int order() {
return service.order();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public interface INamingService extends IDiscoveryClient {

void unregisterInstance(ServiceInstance instance);

default void updateInstance(ServiceInstance instance) {

}

List<String> getServices();

List<ServiceInstance> getInstances(String serviceName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package io.nop.sys.dao.naming;

import io.nop.api.core.beans.FilterBeans;
import io.nop.api.core.beans.query.QueryBean;
import io.nop.api.core.beans.query.QueryFieldBean;
import io.nop.api.core.time.IEstimatedClock;
import io.nop.cluster.discovery.ServiceInstance;
import io.nop.cluster.naming.INamingService;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.util.TagsHelper;
import io.nop.core.lang.json.JsonTool;
import io.nop.dao.api.IDaoProvider;
import io.nop.dao.api.IEntityDao;
import io.nop.sys.dao.entity.NopSysServiceInstance;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SysDaoNamingService implements INamingService {

private IDaoProvider daoProvider;
private Duration autoUpdateInterval;
private Duration cleanupInterval;
private Future<?> cleanupFuture;

public void setAutoUpdateInterval(Duration autoUpdateInterval) {
this.autoUpdateInterval = autoUpdateInterval;
}

public void setCleanupInterval(Duration cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}

@Inject
public void setDaoProvider(IDaoProvider daoProvider) {
this.daoProvider = daoProvider;
}

IEntityDao<NopSysServiceInstance> dao() {
return daoProvider.daoFor(NopSysServiceInstance.class);
}

long getMaxUpdateInterval() {
if (autoUpdateInterval != null)
return autoUpdateInterval.toMillis() + 1000;
return 60 * 1000;
}

@PostConstruct
public void init() {
if (cleanupInterval != null) {
cleanupFuture = GlobalExecutors.globalTimer().scheduleWithFixedDelay(this::cleanup, cleanupInterval.toMillis(), cleanupInterval.toMillis(), TimeUnit.MILLISECONDS);
}
}

@PreDestroy
public void destroy() {
if (cleanupFuture != null) {
cleanupFuture.cancel(false);
cleanupFuture = null;
}
}

void cleanup() {
IEntityDao<NopSysServiceInstance> dao = dao();
QueryBean query = new QueryBean();
query.addFilter(FilterBeans.lt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(System.currentTimeMillis() - 2 * getMaxUpdateInterval())));
query.addFilter(FilterBeans.eq(NopSysServiceInstance.PROP_NAME_isEphemeral, true));
dao.deleteByQuery(query);
}

@Override
public void registerInstance(ServiceInstance instance) {
IEntityDao<NopSysServiceInstance> dao = dao();
IEstimatedClock clock = dao.getDbEstimatedClock();
NopSysServiceInstance entity = dao.getEntityById(instance.getInstanceId());
if (entity != null) {
copyToEntity(entity, instance);
entity.setUpdateTime(clock.getMaxCurrentTime());
dao.updateEntity(entity);
} else {
entity = toEntity(instance);
dao.saveEntity(entity);
}
}

@Override
public void updateInstance(ServiceInstance instance) {
registerInstance(instance);
}

@Override
public void unregisterInstance(ServiceInstance instance) {
dao().deleteEntityById(instance.getInstanceId());
}

@Override
public List<String> getServices() {
IEntityDao<NopSysServiceInstance> dao = dao();
IEstimatedClock clock = dao.getDbEstimatedClock();
QueryBean query = new QueryBean();
query.distinct().addField(QueryFieldBean.forField(NopSysServiceInstance.PROP_NAME_serviceName));
query.addFilter(FilterBeans.gt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(clock.getMinCurrentTimeMillis() - getMaxUpdateInterval())));
return dao.selectStringFieldByQuery(query);
}

@Override
public List<ServiceInstance> getInstances(String serviceName) {
QueryBean query = new QueryBean();
query.addFilter(FilterBeans.eq(NopSysServiceInstance.PROP_NAME_serviceName, serviceName));

// 如果长时间没有更新,则认为服务实例已经失效
query.addFilter(FilterBeans.gt(NopSysServiceInstance.PROP_NAME_updateTime, new Timestamp(System.currentTimeMillis() - getMaxUpdateInterval())));
return dao().findAllByQuery(query).stream().map(this::fromEntity).collect(Collectors.toList());
}

protected NopSysServiceInstance toEntity(ServiceInstance instance) {
NopSysServiceInstance entity = new NopSysServiceInstance();
copyToEntity(entity, instance);
return entity;
}

protected void copyToEntity(NopSysServiceInstance entity, ServiceInstance instance) {
entity.setInstanceId(instance.getInstanceId());
entity.setServiceName(instance.getServiceName());
entity.setServerAddr(instance.getAddr());
entity.setServerPort(instance.getPort());
entity.setClusterName(instance.getClusterName());
entity.setGroupName(instance.getGroupName());
entity.setIsEnabled(instance.isEnabled());
entity.setIsEphemeral(instance.isEphemeral());
entity.setIsHealthy(instance.isHealthy());
if (instance.getMetadata() != null)
entity.setMetaData(JsonTool.serialize(instance.getMetadata(), false));
entity.setTagsText(TagsHelper.toString(instance.getTags()));
entity.setWeight(instance.getWeight());
}

protected ServiceInstance fromEntity(NopSysServiceInstance entity) {
ServiceInstance instance = new ServiceInstance();
instance.setInstanceId(entity.getInstanceId());
instance.setServiceName(entity.getServiceName());
instance.setAddr(entity.getServerAddr());
instance.setPort(entity.getServerPort());
instance.setClusterName(entity.getClusterName());
instance.setGroupName(entity.getGroupName());
instance.setEnabled(entity.getIsEnabled());
instance.setEphemeral(entity.getIsEphemeral());
instance.setHealthy(entity.getIsHealthy());
instance.setWeight(entity.getWeight());
if (entity.getMetaData() != null)
instance.setMetadata((Map<String, String>) JsonTool.parseNonStrict(entity.getMetaData()));
instance.setTags(TagsHelper.parse(entity.getTagsText(), ','));
return instance;
}
}

0 comments on commit 060340a

Please sign in to comment.