From 1a310323904d45cf4a5484901cc3d6e6741e5c7f Mon Sep 17 00:00:00 2001 From: Dantlian <47438305+Dantlian@users.noreply.github.com> Date: Mon, 8 Jan 2024 21:56:16 +0800 Subject: [PATCH] store schemas by local storage (#1449) Co-authored-by: l00618052 --- datasource/etcd/ms.go | 73 ++- datasource/local/bootstrap/bootstrap.go | 5 + datasource/local/schema.go | 626 ++++++++++++++++++++++++ datasource/schema/init.go | 14 +- datasource/schema/schema.go | 2 + integration/microservices_test.go | 2 +- server/bootstrap/bootstrap.go | 3 + server/config/config.go | 3 +- server/config/server.go | 2 + server/service/disco/schema.go | 66 ++- 10 files changed, 778 insertions(+), 18 deletions(-) create mode 100644 datasource/local/bootstrap/bootstrap.go create mode 100644 datasource/local/schema.go diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go index e0c30c284..0052151d2 100644 --- a/datasource/etcd/ms.go +++ b/datasource/etcd/ms.go @@ -22,6 +22,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/apache/servicecomb-service-center/datasource/local" + "path/filepath" "strconv" "time" @@ -53,12 +55,14 @@ type MetadataManager struct { InstanceTTL int64 } +const LOCAL = "local" + // RegisterService implement: // 1. capsule request to etcd kv format // 2. invoke etcd client to store data // 3. check etcd-client response && construct createServiceResponse func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.CreateServiceRequest) ( - *pb.CreateServiceResponse, error) { + response *pb.CreateServiceResponse, err error) { remoteIP := util.GetIPFromContext(ctx) service := request.Service serviceFlag := util.StringJoin([]string{ @@ -90,6 +94,32 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea return nil, pb.NewError(pb.ErrInternal, err.Error()) } + if schema.StorageType == LOCAL { + contents := make([]*schema.ContentItem, len(service.Schemas)) + err = schema.Instance().PutManyContent(ctx, &schema.PutManyContentRequest{ + ServiceID: service.ServiceId, + SchemaIDs: service.Schemas, + Contents: contents, + Init: true, + }) + if err != nil { + return nil, err + } + + serviceMutex := local.GetOrCreateMutex(service.ServiceId) + serviceMutex.Lock() + defer serviceMutex.Unlock() + } + + defer func() { + if schema.StorageType == LOCAL && err != nil { + cleanDirErr := local.CleanDir(filepath.Join(schema.RootFilePath, domainProject, service.ServiceId)) + if cleanDirErr != nil { + log.Error("clean dir error when rollback in RegisterService", cleanDirErr) + } + } + }() + key := path.GenerateServiceKey(domainProject, service.ServiceId) alias := path.GenerateServiceAliasKey(serviceKey) @@ -128,6 +158,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea if resp.Succeeded { log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully, operator: %s", service.ServiceId, serviceFlag, remoteIP)) + return &pb.CreateServiceResponse{ ServiceId: service.ServiceId, }, nil @@ -1425,7 +1456,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s return nil } -func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error { +func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) (err error) { serviceID := request.ServiceId force := request.Force remoteIP := util.GetIPFromContext(ctx) @@ -1442,6 +1473,42 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De return pb.NewError(pb.ErrInvalidParams, err.Error()) } + // try to delete schema files + if schema.StorageType == LOCAL { + tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID) + originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID) + + err = local.MoveDir(originPath, tmpPath) + if err != nil { + log.Error(fmt.Sprintf("%s micro-service[%s] failed, clean local schmea dir failed, operator: %s", + title, serviceID, remoteIP), err) + return err + } + + serviceMutex := local.GetOrCreateMutex(serviceID) + serviceMutex.Lock() + defer serviceMutex.Unlock() + } + + defer func() { + if schema.StorageType == LOCAL { + tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID) + originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID) + var rollbackErr error + if err != nil { + rollbackErr = local.MoveDir(tmpPath, originPath) + if rollbackErr != nil { + log.Error("clean dir error when rollback in UnregisterService", err) + } + } else { + rollbackErr = local.CleanDir(tmpPath) + if rollbackErr != nil { + log.Error("clean tmp dir error when rollback in UnregisterService", err) + } + } + } + }() + microservice, err := eutil.GetService(ctx, domainProject, serviceID) if err != nil { if errors.Is(err, datasource.ErrNoData) { @@ -1518,7 +1585,7 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De } opts = append(opts, optDeleteDep) - //删除schemas + // 删除schemas opts = append(opts, etcdadpt.OpDel( etcdadpt.WithStrKey(path.GenerateServiceSchemaKey(domainProject, serviceID, "")), etcdadpt.WithPrefix())) diff --git a/datasource/local/bootstrap/bootstrap.go b/datasource/local/bootstrap/bootstrap.go new file mode 100644 index 000000000..447a29279 --- /dev/null +++ b/datasource/local/bootstrap/bootstrap.go @@ -0,0 +1,5 @@ +package bootstrap + +import ( + _ "github.com/apache/servicecomb-service-center/datasource/local" +) diff --git a/datasource/local/schema.go b/datasource/local/schema.go new file mode 100644 index 000000000..69aab4eda --- /dev/null +++ b/datasource/local/schema.go @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "context" + "encoding/json" + "fmt" + "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/etcd/path" + etcdsync "github.com/apache/servicecomb-service-center/datasource/etcd/sync" + "github.com/apache/servicecomb-service-center/datasource/schema" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/go-chassis/cari/discovery" + "github.com/go-chassis/openlog" + "github.com/little-cui/etcdadpt" + "io/fs" + "os" + pathutil "path" + "path/filepath" + "strings" + "sync" +) + +var MutexMap = make(map[string]*sync.Mutex) +var mutexMapLock = &sync.Mutex{} +var rollbackMutexLock = &sync.Mutex{} +var createDirMutexLock = &sync.Mutex{} + +func init() { + schema.Install("local_with_embeded_etcd", NewSchemaDAO) + schema.Install("local_with_embedded_etcd", NewSchemaDAO) +} + +func NewSchemaDAO(opts schema.Options) (schema.DAO, error) { + return &SchemaDAO{}, nil +} + +func GetOrCreateMutex(path string) *sync.Mutex { + mutexMapLock.Lock() + mutex, ok := MutexMap[path] + if !ok { + mutex = &sync.Mutex{} + MutexMap[path] = mutex + } + mutexMapLock.Unlock() + + return mutex +} + +type SchemaDAO struct{} + +func ExistDir(path string) error { + _, err := os.ReadDir(path) + if err != nil { + // create the dir if not exist + if os.IsNotExist(err) { + createDirMutexLock.Lock() + defer createDirMutexLock.Unlock() + err = os.MkdirAll(path, fs.ModePerm) + if err != nil { + log.Error(fmt.Sprintf("failed to makr dir %s ", path), err) + return err + } + return nil + } + if err != nil { + log.Error(fmt.Sprintf("failed to read dir %s ", path), err) + } + } + return err +} + +func MoveDir(srcDir string, dstDir string) (err error) { + srcMutex := GetOrCreateMutex(srcDir) + dstMutex := GetOrCreateMutex(dstDir) + srcMutex.Lock() + dstMutex.Lock() + defer srcMutex.Unlock() + defer dstMutex.Unlock() + + var movedFiles []string + files, err := os.ReadDir(srcDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + log.Error("move schema files failed ", err) + return err + } + for _, file := range files { + err = ExistDir(dstDir) + if err != nil { + return err + } + srcFile := filepath.Join(srcDir, file.Name()) + dstFile := filepath.Join(dstDir, file.Name()) + err = os.Rename(srcFile, dstFile) + if err != nil { + log.Error("move schema files failed ", err) + break + } + movedFiles = append(movedFiles, file.Name()) + } + + if err != nil { + log.Error("Occur error when move schema files, begain rollback... ", err) + for _, fileName := range movedFiles { + srcFile := filepath.Join(srcDir, fileName) + dstFile := filepath.Join(dstDir, fileName) + err = os.Rename(dstFile, srcFile) + if err != nil { + log.Error("Occur error when move schema rollback... ", err) + } + } + } + return err +} + +func createOrUpdateFile(filepath string, content []byte, rollbackOperations *[]FileDoRecord, isRollback bool) error { + err := ExistDir(pathutil.Dir(filepath)) + if !isRollback { + mutex := GetOrCreateMutex(pathutil.Dir(filepath)) + mutex.Lock() + defer mutex.Unlock() + } + + if err != nil { + log.Error(fmt.Sprintf("failed to build new schema file dir %s", filepath), err) + return err + } + + var fileExist = true + _, err = os.Stat(filepath) + if err != nil { + fileExist = false + } + + if fileExist { + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error(fmt.Sprintf("failed to read content to file %s ", filepath), err) + return err + } + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + } else { + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: nil}) + } + + err = os.WriteFile(filepath, content, 0666) + if err != nil { + log.Error(fmt.Sprintf("failed to create file %s", filepath), err) + return err + } + + return nil +} + +func deleteFile(filepath string, rollbackOperations *[]FileDoRecord, isRollback bool) error { + if !isRollback { + mutex := GetOrCreateMutex(filepath) + mutex.Lock() + defer delete(MutexMap, filepath) + defer mutex.Unlock() + } + + _, err := os.Stat(filepath) + if err != nil { + log.Error(fmt.Sprintf("file does not exist when deleting file %s ", filepath), err) + return nil + } + + oldcontent, err := os.ReadFile(filepath) + if err != nil { + log.Error(fmt.Sprintf("failed to read content to file %s ", filepath), err) + return err + } + + *rollbackOperations = append(*rollbackOperations, FileDoRecord{filepath: filepath, content: oldcontent}) + + err = os.Remove(filepath) + if err != nil { + log.Error(fmt.Sprintf("failed to delete file %s ", filepath), err) + return err + } + + return nil +} + +func CleanDir(dir string) error { + rollbackOperations := []FileDoRecord{} + _, err := os.Stat(dir) + if err != nil { + return nil + } + + files, err := os.ReadDir(dir) + if err != nil { + return nil + } + + for _, file := range files { + if file.IsDir() { + continue + } + filepath := filepath.Join(dir, file.Name()) + err = deleteFile(filepath, &rollbackOperations, false) + if err != nil { + break + } + } + + if err != nil { + log.Error("Occur error when create schema files, begain rollback... ", err) + rollback(rollbackOperations) + return err + } + + err = os.Remove(dir) + if err != nil { + log.Error("Occur error when remove service schema dir, begain rollback... ", err) + rollback(rollbackOperations) + return err + } + + return nil +} + +func ReadFile(filepath string) ([]byte, error) { + mutex := GetOrCreateMutex(filepath) + mutex.Lock() + defer mutex.Unlock() + + // check the file is empty + content, err := os.ReadFile(filepath) + if err != nil { + log.Error(fmt.Sprintf("failed to read content to file %s ", filepath), err) + return nil, err + } + return content, nil +} + +func ReadAllFiles(dir string) ([]string, [][]byte, error) { + files := []string{} + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + files = append(files, path) + return nil + }) + + if err != nil { + return nil, nil, err + } + + var contentArray [][]byte + for _, file := range files { + content, err := ReadFile(file) + if err != nil { + log.Error(fmt.Sprintf("failed to read content from schema file %s ", file), err) + return nil, nil, err + } + contentArray = append(contentArray, content) + } + return files, contentArray, nil +} + +func rollback(rollbackOperations []FileDoRecord) { + rollbackMutexLock.Lock() + defer rollbackMutexLock.Unlock() + + var err error + for _, fileOperation := range rollbackOperations { + if fileOperation.content == nil { + err = deleteFile(fileOperation.filepath, &[]FileDoRecord{}, true) + } else { + err = createOrUpdateFile(fileOperation.filepath, fileOperation.content, &[]FileDoRecord{}, true) + } + if err != nil { + log.Error("Occur error when rolling back schema files: ", err) + } + } +} + +type FileDoRecord struct { + filepath string + content []byte +} + +func (s *SchemaDAO) GetRef(ctx context.Context, refRequest *schema.RefRequest) (*schema.Ref, error) { + domainProject := util.ParseDomainProject(ctx) + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + serviceID := refRequest.ServiceID + schemaID := refRequest.SchemaID + + servicepath := filepath.Join(schema.RootFilePath, domainProject, serviceID, schemaID+".json") + + // read file content + content, err := ReadFile(servicepath) + if err != nil { + log.Error(fmt.Sprintf("read service[%s] schema content file [%s] failed ", serviceID, schemaID), err) + if os.IsNotExist(err) { + return nil, schema.ErrSchemaNotFound + } + return nil, err + } + + var schemaContent schema.ContentItem + err = json.Unmarshal(content, &schemaContent) + + if err != nil { + log.Error(fmt.Sprintf("get service[%s] schema content file [%s] failed when unmarshal", serviceID, schemaID), err) + return nil, err + } + + return &schema.Ref{ + Domain: domain, + Project: project, + ServiceID: serviceID, + SchemaID: schemaID, + Hash: schemaContent.Hash, + Summary: schemaContent.Summary, + Content: schemaContent.Content, + }, nil +} + +func (s *SchemaDAO) ListRef(ctx context.Context, refRequest *schema.RefRequest) ([]*schema.Ref, error) { + domainProject := util.ParseDomainProject(ctx) + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + serviceID := refRequest.ServiceID + + var dir = filepath.Join(schema.RootFilePath, domainProject, serviceID) + schemaIDs, contents, err := ReadAllFiles(dir) + + if err != nil { + log.Error(fmt.Sprintf("read service[%s] schema content files failed ", serviceID), err) + return nil, err + } + + schemas := make([]*schema.Ref, 0, len(contents)) + for i := 0; i < len(contents); i++ { + content := contents[i] + var schemaContent schema.ContentItem + err = json.Unmarshal(content, &schemaContent) + if err != nil { + log.Error(fmt.Sprintf("failed to unmarshal schema content for service [%s] and schema [%s]", serviceID, schemaIDs[i]), err) + return nil, err + } + + schemaFileName := schemaIDs[i] + baseName := filepath.Base(schemaFileName) + extension := filepath.Ext(baseName) + schemaID := strings.TrimSuffix(baseName, extension) + + schemas = append(schemas, &schema.Ref{ + Domain: domain, + Project: project, + ServiceID: serviceID, + SchemaID: schemaID, + Hash: schemaContent.Summary, + Summary: schemaContent.Summary, // may be empty + Content: schemaContent.Content, + }) + } + return schemas, nil +} + +func removeStringFromSlice(slice []string, s string) []string { + for i := 0; i < len(slice); i++ { + if slice[i] == s { + slice = append(slice[:i], slice[i+1:]...) + i-- + } + } + return slice +} + +func (s *SchemaDAO) DeleteRef(ctx context.Context, refRequest *schema.RefRequest) error { + rollbackOperations := []FileDoRecord{} + domainProject := util.ParseDomainProject(ctx) + serviceID := refRequest.ServiceID + schemaID := refRequest.SchemaID + schemaPath := filepath.Join(schema.RootFilePath, domainProject, serviceID, schemaID+".json") + + err := deleteFile(schemaPath, &rollbackOperations, false) + + if err != nil { + log.Error("Occur error when delete schema file, begain rollback... ", err) + rollback(rollbackOperations) + return err + } + + // update schemas in service + service, err := datasource.GetMetadataManager().GetService(ctx, &discovery.GetServiceRequest{ + ServiceId: serviceID, + }) + if err != nil { + log.Error(fmt.Sprintf("get service[%s] failed", serviceID), err) + rollback(rollbackOperations) + return err + } + + service.Schemas = removeStringFromSlice(service.Schemas, schemaID) + + err = updateServiceSchema(ctx, serviceID, service) + if err != nil { + rollback(rollbackOperations) + return err + } + return nil +} + +func (s *SchemaDAO) GetContent(ctx context.Context, contentRequest *schema.ContentRequest) (*schema.Content, error) { + // no usage, should not be called + log.Error("Occur error when call SchemaDAO.GetContent, this method should not be called in any condition", schema.ErrSchemaNotFound) + return nil, schema.ErrSchemaNotFound +} + +func (s *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.PutContentRequest) error { + rollbackOperations := []FileDoRecord{} + domainProject := util.ParseDomainProject(ctx) + serviceID := contentRequest.ServiceID + servicepath := filepath.Join(schema.RootFilePath, domainProject, serviceID) + schemaPath := filepath.Join(servicepath, contentRequest.SchemaID+".json") + + var err error + defer func() { + if err != nil { + rollback(rollbackOperations) + } + }() + + // update file + schemaBytes, marshalErr := json.Marshal(contentRequest.Content) + err = marshalErr + if err != nil { + openlog.Error("fail to marshal kv " + err.Error()) + return err + } + + err = createOrUpdateFile(schemaPath, schemaBytes, &rollbackOperations, false) + if err != nil { + log.Error("Occur error when create schema files when update schemas, begain rollback... ", err) + return err + } + + // update service schema + service, serviceErr := datasource.GetMetadataManager().GetService(ctx, &discovery.GetServiceRequest{ + ServiceId: serviceID, + }) + err = serviceErr + if err != nil { + log.Error(fmt.Sprintf("get service[%s] failed when update schemas", serviceID), err) + return err + } + + var schemaIdValid = false + for _, serviceSchemaId := range service.Schemas { + if serviceSchemaId == contentRequest.SchemaID { + schemaIdValid = true + } + } + if !schemaIdValid { + err = schema.ErrSchemaNotFound + log.Error(fmt.Sprintf("update service[%s] failed when valide schema id", serviceID), err) + return err + } + + err = updateServiceSchema(ctx, serviceID, service) + if err != nil { + log.Error(fmt.Sprintf("update service[%s] failed when update schemas", serviceID), err) + return err + } + return nil +} + +// update schemas in service +func updateServiceSchema(ctx context.Context, serviceID string, service *discovery.MicroService) error { + // get the mutex lock + serviceMutex := GetOrCreateMutex(serviceID) + serviceMutex.Lock() + defer serviceMutex.Unlock() + + domainProject := util.ParseDomainProject(ctx) + body, err := json.Marshal(service) + if err != nil { + log.Error("marshal service failed", err) + return err + } + + var options []etcdadpt.OpOptions + serviceKey := path.GenerateServiceKey(domainProject, serviceID) + options = append(options, etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body))) + + // update service task + serviceOpts, err := etcdsync.GenUpdateOpts(ctx, datasource.ResourceKV, body, etcdsync.WithOpts(map[string]string{"key": serviceKey})) + if err != nil { + log.Error("fail to create update opts", err) + return err + } + options = append(options, serviceOpts...) + err = etcdadpt.Txn(ctx, options) + + return err +} + +func (s *SchemaDAO) PutManyContent(ctx context.Context, contentRequest *schema.PutManyContentRequest) error { + rollbackOperations := []FileDoRecord{} + domainProject := util.ParseDomainProject(ctx) + serviceID := contentRequest.ServiceID + servicepath := filepath.Join(schema.RootFilePath, domainProject, serviceID) + + if len(contentRequest.SchemaIDs) != len(contentRequest.Contents) { + log.Error(fmt.Sprintf("service[%s] contents request invalid", serviceID), nil) + return discovery.NewError(discovery.ErrInvalidParams, "contents request invalid") + } + + var err error + defer func() { + if err != nil { + rollback(rollbackOperations) + } + }() + + // get all the files under this dir + existedFiles, readErr := os.ReadDir(servicepath) + err = readErr + if err != nil && !os.IsNotExist(err) { + return err + } + err = nil + + // clean existed files + for _, file := range existedFiles { + if file.IsDir() { + continue + } + filepath := servicepath + "/" + file.Name() + err = deleteFile(filepath, &rollbackOperations, false) + if err != nil { + break + } + } + if err != nil { + log.Error("Occur error when clean schema files before update schemas, begain rollback... ", err) + return err + } + + // create or update files + for i := 0; i < len(contentRequest.SchemaIDs); i++ { + schemaId := contentRequest.SchemaIDs[i] + schema := contentRequest.Contents[i] + + schemaBytes, marshalErr := json.Marshal(schema) + err = marshalErr + if err != nil { + openlog.Error("fail to marshal kv " + err.Error()) + return err + } + err = createOrUpdateFile(servicepath+"/"+schemaId+".json", schemaBytes, &rollbackOperations, false) + if err != nil { + break + } + } + + if err != nil { + log.Error("Occur error when create schema files when update schemas, begain rollback... ", err) + return err + } + + // update service schema + if contentRequest.Init { + return nil + } + + // query service schema + service, serviceErr := datasource.GetMetadataManager().GetService(ctx, &discovery.GetServiceRequest{ + ServiceId: serviceID, + }) + err = serviceErr + if err != nil { + log.Error(fmt.Sprintf("get service[%s] failed, service not exist", serviceID), err) + return err + } + service.Schemas = contentRequest.SchemaIDs + + err = updateServiceSchema(ctx, serviceID, service) + if err != nil { + log.Error(fmt.Sprintf("update service[%s] failed when update schemas", serviceID), err) + } + return err +} + +func (s *SchemaDAO) DeleteContent(ctx context.Context, contentRequest *schema.ContentRequest) error { + // no usage, should not be called + log.Error("Occur error when call SchemaDAO.DeleteContent, this method should not be called in any condition", schema.ErrSchemaContentNotFound) + return schema.ErrSchemaContentNotFound +} + +func (s *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) { + // no usage, should not be called + log.Error("Occur error when call SchemaDAO.DeleteNoRefContents, this method should not be called in any condition", schema.ErrSchemaNotFound) + return 0, schema.ErrSchemaNotFound +} diff --git a/datasource/schema/init.go b/datasource/schema/init.go index 1c24df50e..77934ab2c 100644 --- a/datasource/schema/init.go +++ b/datasource/schema/init.go @@ -19,10 +19,14 @@ package schema import ( "fmt" - "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/config" + "strings" ) +var StorageType = "" +var RootFilePath = "" + type initFunc func(opts Options) (DAO, error) var ( @@ -41,8 +45,14 @@ func Init(opts Options) error { if opts.Kind == "" { return nil } + kind := opts.Kind + if strings.Trim(config.GetRegistry().SchemaRootPath, " ") != "" { + kind = "local_with_embeded_etcd" + StorageType = "local" + RootFilePath = config.GetRegistry().SchemaRootPath + } - engineFunc, ok := plugins[opts.Kind] + engineFunc, ok := plugins[kind] if !ok { return fmt.Errorf("plugin implement not supported [%s]", opts.Kind) } diff --git a/datasource/schema/schema.go b/datasource/schema/schema.go index b00556998..89ab133c0 100644 --- a/datasource/schema/schema.go +++ b/datasource/schema/schema.go @@ -42,6 +42,7 @@ type Ref struct { SchemaID string `json:"schemaId" bson:"schema_id"` Hash string Summary string + Content string } type ContentRequest struct { @@ -71,6 +72,7 @@ type PutManyContentRequest struct { ServiceID string `json:"serviceId" bson:"service_id"` SchemaIDs []string Contents []*ContentItem + Init bool } type DAO interface { diff --git a/integration/microservices_test.go b/integration/microservices_test.go index c967c48f5..1ca2c048f 100644 --- a/integration/microservices_test.go +++ b/integration/microservices_test.go @@ -141,7 +141,7 @@ var _ = Describe("MicroService Api Test", func() { req, _ := http.NewRequest(DELETE, SCURL+url, nil) req.Header.Set("X-Domain-Name", "default") resp, _ := scclient.Do(req) - Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + Expect(resp.StatusCode).To(Or(Equal(http.StatusBadRequest), Equal(http.StatusInternalServerError))) }) }) diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go index 2882c9d5b..8440e344e 100644 --- a/server/bootstrap/bootstrap.go +++ b/server/bootstrap/bootstrap.go @@ -30,6 +30,9 @@ import ( //mongo _ "github.com/apache/servicecomb-service-center/datasource/mongo/bootstrap" + //local + _ "github.com/apache/servicecomb-service-center/datasource/local/bootstrap" + //rest v3 api _ "github.com/apache/servicecomb-service-center/server/rest/controller/v3" diff --git a/server/config/config.go b/server/config/config.go index f0af627f3..01fb12f5a 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -183,7 +183,8 @@ func loadServerConfig() ServerConfig { GlobalVisible: GetString("registry.service.globalVisible", "", WithENV("CSE_SHARED_SERVICES")), InstanceTTL: GetInt64("registry.instance.ttl", 0, WithENV("INSTANCE_TTL")), - SchemaDisable: GetBool("registry.schema.disable", false, WithENV("SCHEMA_DISABLE")), + SchemaDisable: GetBool("registry.schema.disable", false, WithENV("SCHEMA_DISABLE")), + SchemaRootPath: GetString("registry.schema.schemaRootPath", "", WithENV("SCHEMA_ROOT_PATH")), EnableRBAC: GetBool("rbac.enable", false, WithStandby("rbac_enabled")), }, diff --git a/server/config/server.go b/server/config/server.go index 809169791..9a2a0f288 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -71,6 +71,8 @@ type ServerConfigDetail struct { // if want disable Test Schema, SchemaDisable set true SchemaDisable bool `json:"schemaDisable"` + SchemaRootPath string `json:"-"` + // instance ttl in seconds InstanceTTL int64 `json:"-"` } diff --git a/server/service/disco/schema.go b/server/service/disco/schema.go index 5c59015b1..98d224b3e 100644 --- a/server/service/disco/schema.go +++ b/server/service/disco/schema.go @@ -32,6 +32,8 @@ import ( pb "github.com/go-chassis/cari/discovery" ) +const LOCAL = "local" + // ExistSchema only return the summary without content if schema exist func ExistSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, error) { remoteIP := util.GetIPFromContext(ctx) @@ -48,6 +50,7 @@ func ExistSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, ServiceID: serviceID, SchemaID: schemaID, }) + if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return existOldSchema(ctx, request) @@ -56,6 +59,16 @@ func ExistSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, serviceID, schemaID, remoteIP), nil) return nil, err } + + // return directly when using local fs + if schema.StorageType == LOCAL { + return &pb.Schema{ + SchemaId: schemaID, + Schema: ref.Content, + Summary: ref.Summary, + }, nil + } + return &pb.Schema{ SchemaId: schemaID, Summary: ref.Summary, @@ -92,6 +105,7 @@ func GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, e ServiceID: serviceID, SchemaID: schemaID, }) + if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return getOldSchema(ctx, request) @@ -101,6 +115,15 @@ func GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, e return nil, err } + // return directly when using local fs + if schema.StorageType == LOCAL { + return &pb.Schema{ + SchemaId: schemaID, + Schema: ref.Content, + Summary: ref.Summary, + }, nil + } + content, err := schema.Instance().GetContent(ctx, &schema.ContentRequest{ Hash: ref.Hash, }) @@ -138,15 +161,37 @@ func ListSchema(ctx context.Context, request *pb.GetAllSchemaRequest) ([]*pb.Sch return nil, pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } - schemaIDs, err := getOldSchemaIDs(ctx, serviceID) + schemaRefs, err := schema.Instance().ListRef(ctx, &schema.RefRequest{ + ServiceID: serviceID, + }) if err != nil { log.Error(fmt.Sprintf("list service[%s] schemaIDs failed, operator: %s", serviceID, remoteIP), nil) return nil, err } - requests, err := mergeRequests(ctx, serviceID, schemaIDs) + // return directly when using local fs + if schema.StorageType == LOCAL { + schemas := make([]*pb.Schema, 0, len(schemaRefs)) + for _, ref := range schemaRefs { + item := &pb.Schema{ + SchemaId: ref.SchemaID, + Summary: ref.Summary, + Schema: ref.Content, + } + schemas = append(schemas, item) + } + return schemas, nil + } + + oldSchemaIDs, err := getOldSchemaIDs(ctx, serviceID) if err != nil { - log.Error(fmt.Sprintf("list service[%s] schema-refs failed, operator: %s", serviceID, remoteIP), nil) + log.Error(fmt.Sprintf("list service[%s] schemaIDs failed, operator: %s", serviceID, remoteIP), err) + return nil, err + } + + requests, err := mergeRequests(ctx, serviceID, schemaRefs, oldSchemaIDs) + if err != nil { + log.Error(fmt.Sprintf("list service[%s] schema-refs failed, operator: %s", serviceID, remoteIP), err) return nil, err } @@ -190,14 +235,7 @@ func getOldSchemaIDs(ctx context.Context, serviceID string) ([]string, error) { return schemaIDs, nil } -func mergeRequests(ctx context.Context, serviceID string, oldSchemaIDs []string) ([]*pb.GetSchemaRequest, error) { - refs, err := schema.Instance().ListRef(ctx, &schema.RefRequest{ - ServiceID: serviceID, - }) - if err != nil { - return nil, err - } - +func mergeRequests(ctx context.Context, serviceID string, refs []*schema.Ref, oldSchemaIDs []string) ([]*pb.GetSchemaRequest, error) { set := mapset.NewSet() for _, schemaID := range oldSchemaIDs { set.Add(schemaID) @@ -237,6 +275,7 @@ func DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) error { ServiceID: request.ServiceId, SchemaID: request.SchemaId, }) + if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return deleteOldSchema(ctx, request) @@ -247,6 +286,11 @@ func DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) error { } log.Info(fmt.Sprintf("delete service[%s] schema[%s], operator: %s", request.ServiceId, request.SchemaId, remoteIP)) + // return directly when using local fs + if schema.StorageType == LOCAL { + return err + } + err = deleteOldSchema(ctx, request) if err != nil && !errors.Is(err, schema.ErrSchemaNotFound) { log.Error(fmt.Sprintf("delete old service[%s] schema[%s] failed, operator: %s",