Skip to content

Commit

Permalink
[fea] service-center support the feature of manage environments (#1471)
Browse files Browse the repository at this point in the history
Co-authored-by: songshiyuan 00649746 <[email protected]>
  • Loading branch information
tornado-ssy and songshiyuan 00649746 authored May 31, 2024
1 parent 7818787 commit f27f047
Show file tree
Hide file tree
Showing 27 changed files with 928 additions and 52 deletions.
13 changes: 7 additions & 6 deletions datasource/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ const (
RegistryAppID = "default"
Provider = "p"

ResourceAccount = "account"
ResourceRole = "role"
ResourceService = "service"
ResourceKV = "kv"
ResourceInstance = "instance"
ResourceHeartbeat = "heartbeat"
ResourceAccount = "account"
ResourceRole = "role"
ResourceService = "service"
ResourceEnvironment = "env"
ResourceKV = "kv"
ResourceInstance = "instance"
ResourceHeartbeat = "heartbeat"
)
226 changes: 226 additions & 0 deletions datasource/etcd/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

pb "github.com/go-chassis/cari/discovery"
ev "github.com/go-chassis/cari/env"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/etcdadpt"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/plugin/uuid"
"github.com/apache/servicecomb-service-center/server/service/disco"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
"github.com/apache/servicecomb-service-center/syncer/service/event"
)
Expand Down Expand Up @@ -1693,3 +1695,227 @@ func (ds *MetadataManager) UpdateManyInstanceStatus(ctx context.Context, match *
}
return nil
}

func (ds *MetadataManager) ListEnvironments(ctx context.Context) (
*ev.GetEnvironmentsResponse, error) {
envs, err := eutil.GetAllEnvironmentUtil(ctx)
if err != nil {
log.Error("get all services by domain failed", err)
return nil, pb.NewError(pb.ErrInternal, err.Error())
}

return &ev.GetEnvironmentsResponse{
Environments: envs,
}, nil
}

func (ds *MetadataManager) RegisterEnvironment(ctx context.Context, request *ev.CreateEnvironmentRequest) (
response *ev.CreateEnvironmentResponse, err error) {
remoteIP := util.GetIPFromContext(ctx)
env := request.Environment
envFlag := util.StringJoin([]string{
env.Name, env.Description}, "/")
log.Info("will create environment:" + envFlag)
domainProject := util.ParseDomainProject(ctx)
envKey := &ev.EnvironmentKey{
Tenant: domainProject,
Name: env.Name,
}
envIndex := path.GenerateEnvironmentIndexKey(envKey)
// 产生全局environment id
requestEnvID := env.ID
if len(requestEnvID) == 0 {
ctx = util.SetContext(ctx, uuid.ContextKey, envIndex)
env.ID = uuid.Generator().GetEnvID(ctx)
}
data, err := json.Marshal(env)
if err != nil {
log.Error(fmt.Sprintf("create Environment[%s] failed, json marshal environment failed, operator: %s",
envFlag, remoteIP), err)
return nil, pb.NewError(pb.ErrInternal, err.Error())
}
key := path.GenerateEnvironmentKey(domainProject, env.ID)

opts := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)),
etcdadpt.OpPut(etcdadpt.WithStrKey(envIndex), etcdadpt.WithStrValue(env.ID)),
}
uniqueCmpOpts := []etcdadpt.CmpOptions{
etcdadpt.NotExistKey(key),
etcdadpt.NotExistKey(envIndex),
}
failOpts := []etcdadpt.OpOptions{
etcdadpt.OpGet(etcdadpt.WithStrKey(envIndex)),
}

syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceEnvironment, request)
if err != nil {
log.Error("fail to create sync opts", err)
return nil, pb.NewError(pb.ErrInternal, err.Error())
}
opts = append(opts, syncOpts...)

resp, err := etcdadpt.TxnWithCmp(ctx, opts, uniqueCmpOpts, failOpts)
if err != nil {
log.Error(fmt.Sprintf("create environment[%s] failed, operator: %s",
envFlag, remoteIP), err)
return nil, pb.NewError(pb.ErrUnavailableBackend, err.Error())
}

if resp.Succeeded {
log.Info(fmt.Sprintf("create environment[%s][%s] successfully, operator: %s",
env.ID, envFlag, remoteIP))
disco.EnvMap.Store(request.Environment.Name, struct{}{})
return &ev.CreateEnvironmentResponse{
EnvId: env.ID,
}, nil
}

if len(requestEnvID) != 0 {
if len(resp.Kvs) == 0 || requestEnvID != util.BytesToStringWithNoCopy(resp.Kvs[0].Value) {
log.Warn(fmt.Sprintf("create environment[%s] failed, environment already exists, operator: %s",
envFlag, remoteIP))
return nil, pb.NewError(pb.ErrEnvironmentAlreadyExists,
"ServiceID conflict or found the same service with different id.")
}
}

if len(resp.Kvs) == 0 {
// internal error?
log.Error(fmt.Sprintf("create environment[%s] failed, unexpected txn response, operator: %s",
envFlag, remoteIP), nil)
return nil, pb.NewError(pb.ErrInternal, "Unexpected txn response.")
}

existEnvironmentID := util.BytesToStringWithNoCopy(resp.Kvs[0].Value)
log.Warn(fmt.Sprintf("create environment[%s][%s] failed, environment already exists, operator: %s",
existEnvironmentID, envFlag, remoteIP))
disco.EnvMap.Store(request.Environment.Name, struct{}{})
return &ev.CreateEnvironmentResponse{
EnvId: existEnvironmentID,
}, nil
}

func (ds *MetadataManager) GetEnvironment(ctx context.Context, request *ev.GetEnvironmentRequest) (
*ev.Environment, error) {
domainProject := util.ParseDomainProject(ctx)
singleEnvironment, err := eutil.GetEnvironment(ctx, domainProject, request.EnvironmentId)

if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("get environment[%s] failed, environment does not exist in db", request.EnvironmentId))
return nil, pb.NewError(pb.ErrEnvironmentNotExists, "environment does not exist.")
}
log.Error(fmt.Sprintf("get environment[%s] failed, get environment file failed", request.EnvironmentId), err)
return nil, pb.NewError(pb.ErrInternal, err.Error())
}
return singleEnvironment, nil
}

func (ds *MetadataManager) UpdateEnvironment(ctx context.Context, request *ev.UpdateEnvironmentRequest) (err error) {
remoteIP := util.GetIPFromContext(ctx)
domainProject := util.ParseDomainProject(ctx)
envkey := path.GenerateEnvironmentKey(domainProject, request.Environment.ID)
oldEnvironment, err := eutil.GetEnvironment(ctx, domainProject, request.Environment.ID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("environment does not exist, update environment[%s] failed, operator: %s",
request.Environment.ID, remoteIP))
return pb.NewError(pb.ErrEnvironmentNotExists, "Environment does not exist.")
}
log.Error(fmt.Sprintf("update environment[%s] failed, get environment failed, operator: %s",
request.Environment.ID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
request.Environment.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)
request.Environment.Timestamp = oldEnvironment.Timestamp
data, err := json.Marshal(request.Environment)
if err != nil {
log.Error(fmt.Sprintf("update environment[%s] failed, json marshal environment failed, operator: %s",
request.Environment.ID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}

opts := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(envkey), etcdadpt.WithValue(data)),
}
syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceEnvironment, request)
if err != nil {
log.Error("fail to create task", err)
return pb.NewError(pb.ErrInternal, err.Error())
}
opts = append(opts, syncOpts...)

// Set key file
resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(envkey, 0)), nil)
if err != nil {
log.Error(fmt.Sprintf("update environment[%s] properties failed, operator: %s", request.Environment.ID, remoteIP), err)
return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
if !resp.Succeeded {
log.Error(fmt.Sprintf("update environment[%s] failed, environment does not exist, operator: %s",
request.Environment.ID, remoteIP), err)
return pb.NewError(pb.ErrEnvironmentNotExists, "environment does not exist.")
}

log.Info(fmt.Sprintf("update environment[%s] successfully, operator: %s", request.Environment.ID, remoteIP))
return nil
}

func (ds *MetadataManager) UnregisterEnvironment(ctx context.Context, request *ev.DeleteEnvironmentRequest) (err error) {
environmentId := request.EnvironmentId

Check warning on line 1866 in datasource/etcd/ms.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var environmentId should be environmentID (revive)
remoteIP := util.GetIPFromContext(ctx)
domainProject := util.ParseDomainProject(ctx)

environment, err := eutil.GetEnvironment(ctx, domainProject, environmentId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("environment does not exist, del environmentId[%s] failed, operator: %s",
environmentId, remoteIP))
return pb.NewError(pb.ErrEnvironmentNotExists, "environment does not exist.")
}
log.Error(fmt.Sprintf("del environment[%s] failed, get environment file failed, operator: %s",
environmentId, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}

serviceEnvKey := path.GenerateServiceEnvIndexKey(domainProject, environment.Name)
if serviceUtil.ServiceEnvExist(ctx, serviceEnvKey) {
log.Error(fmt.Sprintf("del environment[%s] failed, get environment file failed, operator: %s",
environmentId, remoteIP), errors.New("this env has services"))
return pb.NewError(pb.ErrInternal, "this env has services")
}
environmentIdKey := path.GenerateEnvironmentKey(domainProject, environmentId)

Check warning on line 1888 in datasource/etcd/ms.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var environmentIdKey should be environmentIDKey (revive)
envKey := &ev.EnvironmentKey{
Tenant: domainProject,
Name: environment.Name,
}
opts := []etcdadpt.OpOptions{
etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateEnvironmentIndexKey(envKey))),
etcdadpt.OpDel(etcdadpt.WithStrKey(environmentIdKey)),
}
syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceEnvironment, environmentId,
&ev.DeleteEnvironmentRequest{EnvironmentId: environmentId})
if err != nil {
log.Error("fail to sync opt", err)
return pb.NewError(pb.ErrInternal, err.Error())
}
opts = append(opts, syncOpts...)

resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(environmentIdKey, 0)), nil)
if err != nil {
log.Error(fmt.Sprintf("del environment[%s] failed, operator: %s", environmentId, remoteIP), err)
return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
if !resp.Succeeded {
log.Error(fmt.Sprintf("del environment[%s] failed, environment does not exist, operator: %s",
environmentId, remoteIP), err)
return pb.NewError(pb.ErrEnvironmentNotExists, "environmentId does not exist.")
}

quotasvc.RemandEnvironment(ctx)

log.Info(fmt.Sprintf("del environment[%s] successfully, operator: %s", environmentId, remoteIP))
disco.EnvMap.Delete(environment.Name)
return nil
}
23 changes: 22 additions & 1 deletion datasource/etcd/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"context"
"strings"

pb "github.com/go-chassis/cari/discovery"
ev "github.com/go-chassis/cari/env"

"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
pb "github.com/go-chassis/cari/discovery"
)

const preEnvNum int64 = 4

func (ds *MetadataManager) CountService(ctx context.Context, request *pb.GetServiceCountRequest) (*pb.GetServiceCountResponse, error) {
domainProject := request.Domain
if request.Project != "" {
Expand Down Expand Up @@ -84,3 +88,20 @@ func (ds *MetadataManager) getGlobalInstanceCount(ctx context.Context, domainPro
}
return global, nil
}

func (ds *MetadataManager) CountEnvironment(ctx context.Context, request *ev.GetEnvironmentCountRequest) (*ev.GetEnvironmentCountResponse, error) {
domainProject := request.Domain
if request.Project != "" {
domainProject += path.SPLIT + request.Project
}
all, err := serviceUtil.GetOneDomainProjectEnvironmentCount(ctx, domainProject)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
return &ev.GetEnvironmentCountResponse{
Count: all - preEnvNum,
}, nil
}
41 changes: 41 additions & 0 deletions datasource/etcd/path/key_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"github.com/go-chassis/cari/discovery"

"github.com/apache/servicecomb-service-center/pkg/util"

ev "github.com/go-chassis/cari/env"
)

const (
SPLIT = "/"
RegistryRootKey = "cse-sr"
RegistrySysKey = "sys"
RegistryServiceKey = "ms"
RegistryEnvironmentKey = "envs"
RegistryInstanceKey = "inst"
RegistryFile = "files"
RegistryIndex = "indexes"
Expand Down Expand Up @@ -383,3 +386,41 @@ func GenerateMetricsKey(name, utc, domain string) string {
domain,
}, SPLIT)
}

func GenerateEnvironmentKey(domainProject string, envId string) string {

Check warning on line 390 in datasource/etcd/path/key_generator.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: func parameter envId should be envID (revive)
return util.StringJoin([]string{
GetEnvironmentRootKey(domainProject),
envId,
}, SPLIT)
}

func GetEnvironmentRootKey(domainProject string) string {
return util.StringJoin([]string{
GetRootKey(),
RegistryEnvironmentKey,
domainProject,
}, SPLIT)
}

func GenerateEnvironmentIndexKey(key *ev.EnvironmentKey) string {
return util.StringJoin([]string{
GetEnvironmentIndexRootKey(key.Tenant),
key.Name,
}, SPLIT)
}

func GetEnvironmentIndexRootKey(domainProject string) string {
return util.StringJoin([]string{
GetRootKey(),
RegistryEnvironmentKey,
RegistryIndex,
domainProject,
}, SPLIT)
}

func GenerateServiceEnvIndexKey(domainProject string, name string) string {
return util.StringJoin([]string{
GetServiceIndexRootKey(domainProject),
name,
}, SPLIT)
}
1 change: 1 addition & 0 deletions datasource/etcd/sd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ func DependencyRule() state.State { return state.Get(TypeDependencyRule) }
func DependencyQueue() state.State { return state.Get(TypeDependencyQueue) }
func Domain() state.State { return state.Get(TypeDomain) }
func Project() state.State { return state.Get(TypeProject) }
func Environment() state.State { return state.Get(TypeEnvironment) }
4 changes: 4 additions & 0 deletions datasource/etcd/sd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
TypeDependencyQueue kvstore.Type
TypeInstance kvstore.Type
TypeLease kvstore.Type
TypeEnvironment kvstore.Type
)

func RegisterInnerTypes() {
Expand Down Expand Up @@ -80,4 +81,7 @@ func RegisterInnerTypes() {
TypeProject = state.MustRegister("PROJECT", path.GetProjectRootKey(""),
state.WithInitSize(100),
state.WithParser(parser.StringParser))
TypeEnvironment = state.MustRegister("ENVIRONMENT", path.GetEnvironmentRootKey(""),
state.WithInitSize(100),
state.WithParser(value.EnvironmentParser))
}
Loading

0 comments on commit f27f047

Please sign in to comment.