Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge dev into master #1456

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
version: v1.55.2
args: --timeout=5m --skip-dirs='api,test,.*/controller/(v3|v4)$,.*/bootstrap$,examples,integration' --enable gofmt,revive,gocyclo,goimports --skip-files=.*_test.go$
14 changes: 14 additions & 0 deletions .github/workflows/static_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ jobs:
sudo docker-compose -f ./scripts/docker-compose.yaml up -d
sleep 20
bash -x scripts/ut_test_in_docker.sh mongo
local:
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.18
uses: actions/setup-go@v1
with:
go-version: 1.18
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v1
- name: UT-LOCAL_STORAGE
run: |
rm -rf /data/schemas
bash -x scripts/ut_test_in_docker.sh local
integration-test:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client struct {
Cfg Config
}

func (c *Client) CommonHeaders(ctx context.Context) http.Header {
func (c *Client) CommonHeaders(_ context.Context) http.Header {
var headers = make(http.Header)
// TODO overwrote by context values
if len(c.Cfg.Token) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion client/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
)

func (c *LBClient) WebsocketDial(ctx context.Context, api string, headers http.Header) (conn *websocket.Conn, err error) {
func (c *LBClient) WebsocketDial(_ context.Context, api string, headers http.Header) (conn *websocket.Conn, err error) {
dialer := &websocket.Dialer{TLSClientConfig: c.TLS}
var errs []string
for i := 0; i < c.Retries; i++ {
Expand Down
4 changes: 2 additions & 2 deletions datasource/dependency_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func toString(in *discovery.MicroServiceKey) string {
return path.GenerateProviderDependencyRuleKey(in.Tenant, in)
}

func ParseAddOrUpdateRules(ctx context.Context, dep *Dependency, oldProviderRules *discovery.MicroServiceDependency) {
func ParseAddOrUpdateRules(_ context.Context, dep *Dependency, oldProviderRules *discovery.MicroServiceDependency) {
deleteDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(oldProviderRules.Dependency))
createDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(dep.ProvidersRule))
existDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(oldProviderRules.Dependency))
Expand All @@ -83,7 +83,7 @@ func ParseAddOrUpdateRules(ctx context.Context, dep *Dependency, oldProviderRule
setDep(dep, createDependencyRuleList, existDependencyRuleList, deleteDependencyRuleList)
}

func ParseOverrideRules(ctx context.Context, dep *Dependency, oldProviderRules *discovery.MicroServiceDependency) {
func ParseOverrideRules(_ context.Context, dep *Dependency, oldProviderRules *discovery.MicroServiceDependency) {
deleteDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(oldProviderRules.Dependency))
createDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(dep.ProvidersRule))
existDependencyRuleList := make([]*discovery.MicroServiceKey, 0, len(oldProviderRules.Dependency))
Expand Down
4 changes: 2 additions & 2 deletions datasource/etcd/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func init() {
rbac.Install("embedded_etcd", NewRbacDAO)
}

func NewRbacDAO(opts rbac.Options) (rbac.DAO, error) {
func NewRbacDAO(_ rbac.Options) (rbac.DAO, error) {
return &RbacDAO{}, nil
}

Expand Down Expand Up @@ -187,7 +187,7 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names []string) (bool, err
return true, nil
}

func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *crbac.Account) error {
func (ds *RbacDAO) UpdateAccount(ctx context.Context, _ string, account *crbac.Account) error {
var (
opts []etcdadpt.OpOptions
err error
Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/cache/filter_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (f *ConsumerFilter) Name(ctx context.Context, _ *cache.Node) string {
return ctx.Value(CtxConsumerID).(string)
}

func (f *ConsumerFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
func (f *ConsumerFilter) Init(_ context.Context, _ *cache.Node) (node *cache.Node, err error) {
node = cache.NewNode()
node.Cache.Set(DepResult, &DependencyRuleItem{})
return
Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/cache/filter_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (f *ServiceFilter) Name(ctx context.Context, _ *cache.Node) string {
provider.ServiceName}, "/")
}

func (f *ServiceFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
func (f *ServiceFilter) Init(_ context.Context, _ *cache.Node) (node *cache.Node, err error) {
node = cache.NewNode()
return
}
2 changes: 1 addition & 1 deletion datasource/etcd/cache/filter_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (f *VersionFilter) Name(ctx context.Context, _ *cache.Node) string {
return ""
}

func (f *VersionFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
func (f *VersionFilter) Init(ctx context.Context, _ *cache.Node) (node *cache.Node, err error) {
instance, ok := ctx.Value(CtxProviderInstanceKey).(*pb.HeartbeatSetElement)
if ok {
node = cache.NewNode()
Expand Down
6 changes: 3 additions & 3 deletions datasource/etcd/cache/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (f *FindInstancesCache) GetWithProviderID(ctx context.Context, consumer *pb
func (f *FindInstancesCache) Remove(provider *pb.MicroServiceKey) {
f.Tree.Remove(context.WithValue(context.Background(), CtxProviderKey, provider))
if len(provider.Alias) > 0 {
copy := *provider
copy.ServiceName = copy.Alias
f.Tree.Remove(context.WithValue(context.Background(), CtxProviderKey, &copy))
copyProvider := *provider
copyProvider.ServiceName = copyProvider.Alias
f.Tree.Remove(context.WithValue(context.Background(), CtxProviderKey, &copyProvider))
}
}
102 changes: 86 additions & 16 deletions datasource/etcd/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"time"

"github.com/apache/servicecomb-service-center/syncer/service/event"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"

"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
Expand All @@ -40,25 +35,34 @@ import (
esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/datasource/local"
"github.com/apache/servicecomb-service-center/datasource/schema"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/plugin/uuid"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
"github.com/apache/servicecomb-service-center/syncer/service/event"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
)

type MetadataManager struct {
// InstanceTTL options
InstanceTTL int64
}

const LOCAL = "local"

// RegisterService implement:
// 1. capsule request to etcd kv format
// 2. invoke etcd client to store data
// 3. check etcd-client response && construct createServiceResponse
func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (
*pb.CreateServiceResponse, error) {
response *pb.CreateServiceResponse, err error) {
remoteIP := util.GetIPFromContext(ctx)
service := request.Service
serviceFlag := util.StringJoin([]string{
Expand Down Expand Up @@ -90,6 +94,32 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
return nil, pb.NewError(pb.ErrInternal, err.Error())
}

if schema.StorageType == LOCAL {
contents := make([]*schema.ContentItem, len(service.Schemas))
err = schema.Instance().PutManyContent(ctx, &schema.PutManyContentRequest{
ServiceID: service.ServiceId,
SchemaIDs: service.Schemas,
Contents: contents,
Init: true,
})
if err != nil {
return nil, err
}

serviceMutex := local.GetOrCreateMutex(service.ServiceId)
serviceMutex.Lock()
defer serviceMutex.Unlock()
}

defer func() {
if schema.StorageType == LOCAL && err != nil {
cleanDirErr := local.CleanDir(filepath.Join(schema.RootFilePath, domainProject, service.ServiceId))
if cleanDirErr != nil {
log.Error("clean dir error when rollback in RegisterService", cleanDirErr)
}
}
}()

key := path.GenerateServiceKey(domainProject, service.ServiceId)
alias := path.GenerateServiceAliasKey(serviceKey)

Expand Down Expand Up @@ -128,6 +158,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
if resp.Succeeded {
log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully, operator: %s",
service.ServiceId, serviceFlag, remoteIP))

return &pb.CreateServiceResponse{
ServiceId: service.ServiceId,
}, nil
Expand Down Expand Up @@ -157,7 +188,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
}, nil
}

func (ds *MetadataManager) ListService(ctx context.Context, request *pb.GetServicesRequest) (
func (ds *MetadataManager) ListService(ctx context.Context, _ *pb.GetServicesRequest) (
*pb.GetServicesResponse, error) {
services, err := eutil.GetAllServiceUtil(ctx)
if err != nil {
Expand Down Expand Up @@ -186,7 +217,7 @@ func (ds *MetadataManager) GetService(ctx context.Context, request *pb.GetServic
return singleService, nil
}

func (ds *MetadataManager) GetOverview(ctx context.Context, request *pb.GetServicesRequest) (
func (ds *MetadataManager) GetOverview(ctx context.Context, _ *pb.GetServicesRequest) (
*pb.Statistics, error) {
ctx = util.WithCacheOnly(ctx)
st, err := statistics(ctx, false)
Expand Down Expand Up @@ -848,10 +879,9 @@ func (ds *MetadataManager) SendManyHeartbeat(ctx context.Context, request *pb.He
log.Warn(fmt.Sprintf("instance[%s/%s] is duplicate request heartbeat set",
heartbeatElement.ServiceId, heartbeatElement.InstanceId))
continue
} else {
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
noMultiCounter++
}
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
noMultiCounter++
gopool.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement))
}
count := 0
Expand Down Expand Up @@ -913,7 +943,7 @@ func (ds *MetadataManager) SendHeartbeat(ctx context.Context, request *pb.Heartb
return nil
}

func (ds *MetadataManager) ListManyInstances(ctx context.Context, request *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
func (ds *MetadataManager) ListManyInstances(ctx context.Context, _ *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
domainProject := util.ParseDomainProject(ctx)
key := path.GetInstanceRootKey(domainProject) + path.SPLIT
opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
Expand Down Expand Up @@ -1426,7 +1456,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
return nil
}

func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error {
func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) (err error) {
serviceID := request.ServiceId
force := request.Force
remoteIP := util.GetIPFromContext(ctx)
Expand All @@ -1443,6 +1473,46 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De
return pb.NewError(pb.ErrInvalidParams, err.Error())
}

// try to delete schema files
if schema.StorageType == LOCAL {
tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID)
originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID)

err = local.MoveDir(originPath, tmpPath)
if err != nil {
log.Error(fmt.Sprintf("%s micro-service[%s] failed, clean local schmea dir failed, operator: %s",
title, serviceID, remoteIP), err)
return err
}

serviceMutex := local.GetOrCreateMutex(serviceID)
serviceMutex.Lock()
defer serviceMutex.Unlock()
}

defer func() {
if schema.StorageType == LOCAL {
tmpPath := filepath.Join(schema.RootFilePath, "tmp", domainProject, serviceID)
originPath := filepath.Join(schema.RootFilePath, domainProject, serviceID)
var rollbackErr error
if err != nil {
rollbackErr = local.MoveDir(tmpPath, originPath)
if rollbackErr != nil {
log.Error("clean dir error when rollback in UnregisterService", err)
}
} else {
rollbackErr = local.CleanDir(tmpPath)
if rollbackErr != nil {
log.Error("clean tmp dir error when rollback in UnregisterService", err)
}
rollbackErr = os.Remove(originPath)
if rollbackErr != nil {
log.Error("clean origin dir error when rollback in UnregisterService", err)
}
}
}
}()

microservice, err := eutil.GetService(ctx, domainProject, serviceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
Expand Down Expand Up @@ -1519,7 +1589,7 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De
}
opts = append(opts, optDeleteDep)

//删除schemas
// 删除schemas
opts = append(opts, etcdadpt.OpDel(
etcdadpt.WithStrKey(path.GenerateServiceSchemaKey(domainProject, serviceID, "")),
etcdadpt.WithPrefix()))
Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {
schema.Install("embedded_etcd", NewSchemaDAO)
}

func NewSchemaDAO(opts schema.Options) (schema.DAO, error) {
func NewSchemaDAO(_ schema.Options) (schema.DAO, error) {
return &SchemaDAO{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/sd/aggregate/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (r *Repository) New(t kvstore.Type, cfg *kvstore.Options) state.State {
return NewAggregator(t, cfg)
}

func NewRepository(opts state.Config) state.Repository {
func NewRepository(_ state.Config) state.Repository {
return &Repository{}
}
2 changes: 1 addition & 1 deletion datasource/etcd/sd/k8s/adaptor/listwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type k8sListWatcher struct {
cb OnEventFunc
}

func (w *k8sListWatcher) Handle(ctx context.Context, obj interface{}) {
func (w *k8sListWatcher) Handle(_ context.Context, obj interface{}) {
if w.cb == nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/sd/k8s/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ func (r *Repository) New(t kvstore.Type, cfg *kvstore.Options) state.State {
return adaptor.NewK8sAdaptor(t, cfg)
}

func NewRepository(opts state.Config) state.Repository {
func NewRepository(_ state.Config) state.Repository {
return &Repository{}
}
2 changes: 1 addition & 1 deletion datasource/etcd/sd/servicecenter/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (r *Repository) New(t kvstore.Type, cfg *kvstore.Options) state.State {
return NewServiceCenterAdaptor(t, cfg)
}

func NewRepository(opts state.Config) state.Repository {
func NewRepository(_ state.Config) state.Repository {
return &Repository{}
}
2 changes: 1 addition & 1 deletion datasource/etcd/sd/servicecenter/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (c *Syncer) checkWithConflictHandleFunc(local *Cacher, remote dump.Getter,
}
}

func (c *Syncer) skipHandleFunc(origin *dump.KV, conflict dump.Getter, index int) {
func (c *Syncer) skipHandleFunc(_ *dump.KV, _ dump.Getter, _ int) {
}

func (c *Syncer) logConflictFunc(origin *dump.KV, conflict dump.Getter, index int) {
Expand Down
2 changes: 1 addition & 1 deletion datasource/etcd/state/etcd/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ func (r *Repository) New(t kvstore.Type, cfg *kvstore.Options) state.State {
return NewEtcdState(t.String(), cfg)
}

func NewRepository(opts state.Config) state.Repository {
func NewRepository(_ state.Config) state.Repository {
return &Repository{}
}
Loading
Loading