Skip to content

Commit

Permalink
store schemas by local storage (apache#1449)
Browse files Browse the repository at this point in the history
Co-authored-by: l00618052 <[email protected]>
  • Loading branch information
Dantlian and l00618052 committed Jan 15, 2024
1 parent 1f516ca commit 1a31032
Show file tree
Hide file tree
Showing 10 changed files with 778 additions and 18 deletions.
73 changes: 70 additions & 3 deletions datasource/etcd/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/apache/servicecomb-service-center/datasource/local"
"path/filepath"
"strconv"
"time"

Expand Down Expand Up @@ -53,12 +55,14 @@ type MetadataManager struct {
InstanceTTL int64
}

const LOCAL = "local"

// RegisterService implement:
// 1. capsule request to etcd kv format
// 2. invoke etcd client to store data
// 3. check etcd-client response && construct createServiceResponse
func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (
*pb.CreateServiceResponse, error) {
response *pb.CreateServiceResponse, err error) {
remoteIP := util.GetIPFromContext(ctx)
service := request.Service
serviceFlag := util.StringJoin([]string{
Expand Down Expand Up @@ -90,6 +94,32 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
return nil, pb.NewError(pb.ErrInternal, err.Error())
}

if schema.StorageType == LOCAL {
contents := make([]*schema.ContentItem, len(service.Schemas))
err = schema.Instance().PutManyContent(ctx, &schema.PutManyContentRequest{
ServiceID: service.ServiceId,
SchemaIDs: service.Schemas,
Contents: contents,
Init: true,
})
if err != nil {
return nil, err
}

serviceMutex := local.GetOrCreateMutex(service.ServiceId)
serviceMutex.Lock()
defer serviceMutex.Unlock()
}

defer func() {
if schema.StorageType == LOCAL && err != nil {
cleanDirErr := local.CleanDir(filepath.Join(schema.RootFilePath, domainProject, service.ServiceId))
if cleanDirErr != nil {
log.Error("clean dir error when rollback in RegisterService", cleanDirErr)
}
}
}()

key := path.GenerateServiceKey(domainProject, service.ServiceId)
alias := path.GenerateServiceAliasKey(serviceKey)

Expand Down Expand Up @@ -128,6 +158,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
if resp.Succeeded {
log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully, operator: %s",
service.ServiceId, serviceFlag, remoteIP))

return &pb.CreateServiceResponse{
ServiceId: service.ServiceId,
}, nil
Expand Down Expand Up @@ -1425,7 +1456,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
return nil
}

func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error {
func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) (err error) {
serviceID := request.ServiceId
force := request.Force
remoteIP := util.GetIPFromContext(ctx)
Expand All @@ -1442,6 +1473,42 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De
return pb.NewError(pb.ErrInvalidParams, err.Error())
}

// try to delete schema files
if schema.StorageType == LOCAL {
tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID)
originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID)

err = local.MoveDir(originPath, tmpPath)
if err != nil {
log.Error(fmt.Sprintf("%s micro-service[%s] failed, clean local schmea dir failed, operator: %s",
title, serviceID, remoteIP), err)
return err
}

serviceMutex := local.GetOrCreateMutex(serviceID)
serviceMutex.Lock()
defer serviceMutex.Unlock()
}

defer func() {
if schema.StorageType == LOCAL {
tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID)
originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID)
var rollbackErr error
if err != nil {
rollbackErr = local.MoveDir(tmpPath, originPath)
if rollbackErr != nil {
log.Error("clean dir error when rollback in UnregisterService", err)
}
} else {
rollbackErr = local.CleanDir(tmpPath)
if rollbackErr != nil {
log.Error("clean tmp dir error when rollback in UnregisterService", err)
}
}
}
}()

microservice, err := eutil.GetService(ctx, domainProject, serviceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
Expand Down Expand Up @@ -1518,7 +1585,7 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De
}
opts = append(opts, optDeleteDep)

//删除schemas
// 删除schemas
opts = append(opts, etcdadpt.OpDel(
etcdadpt.WithStrKey(path.GenerateServiceSchemaKey(domainProject, serviceID, "")),
etcdadpt.WithPrefix()))
Expand Down
5 changes: 5 additions & 0 deletions datasource/local/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package bootstrap

import (
_ "github.com/apache/servicecomb-service-center/datasource/local"
)
Loading

0 comments on commit 1a31032

Please sign in to comment.