Skip to content

Commit

Permalink
Add readiness probe api (#1497)
Browse files Browse the repository at this point in the history
  • Loading branch information
humingcheng authored Jan 18, 2025
1 parent 8a969df commit 4a07d7e
Show file tree
Hide file tree
Showing 15 changed files with 217 additions and 35 deletions.
26 changes: 26 additions & 0 deletions docs/openapi/v4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ paths:
description: 内部错误
schema:
$ref: '#/definitions/Error'
/v4/{project}/registry/health/readiness:
get:
description: |
服务中心readiness探测接口,确认服务中心是否可以接收请求。
parameters:
- name: x-domain-name
in: header
type: string
default: default
- name: project
in: path
required: true
type: string
tags:
- base
responses:
200:
description: 状态正常,可以接收请求
400:
description: 错误的请求
schema:
$ref: '#/definitions/Error'
500:
description: 内部错误
schema:
$ref: '#/definitions/Error'
/v4/{project}/registry/microservices/{serviceId}:
get:
description: |
Expand Down
42 changes: 42 additions & 0 deletions pkg/protect/checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package protect

import (
"sync"
"time"

"github.com/apache/servicecomb-service-center/pkg/log"
)

// ProtectionChecker checks whether to do the protection on null instance
type ProtectionChecker interface {
CheckProtection() bool
}

// DelayedStopProtectChecker means start the protection and close it after some time
type DelayedStopProtectChecker struct {
delay time.Duration
start time.Time
logWhenStop sync.Once
}

func NewDelayedSuccessChecker(delay time.Duration) *DelayedStopProtectChecker {
return &DelayedStopProtectChecker{
delay: delay,
start: time.Now(),
logWhenStop: sync.Once{},
}
}

func (d *DelayedStopProtectChecker) CheckProtection() bool {
if time.Since(d.start) > d.delay {
d.logWhenStop.Do(func() {
log.Info("null instance protection stopped")
})
return false
}
return true
}

func GetGlobalProtectionChecker() ProtectionChecker {
return globalProtectionChecker
}
15 changes: 15 additions & 0 deletions pkg/protect/checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package protect

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDelayedStopProtectChecker_CheckProtection(t *testing.T) {
p := NewDelayedSuccessChecker(1 * time.Second)
assert.True(t, p.CheckProtection())
time.Sleep(p.delay)
assert.False(t, p.CheckProtection())
}
59 changes: 45 additions & 14 deletions pkg/protect/protect.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package protect

import (
"context"
"fmt"
"net/http"
"time"

"github.com/apache/servicecomb-service-center/server/config"
"github.com/go-chassis/foundation/gopool"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/service/registry"
)

/**
Expand All @@ -16,18 +19,18 @@ indicating that sdk not need to clear cache
*/

var (
isWithinProtection bool
startupTimestamp int64
enableInstanceNullProtect bool
restartProtectInterval time.Duration
RestartProtectHttpCode int
validProtectCode = map[int]struct{}{http.StatusNotModified: {}, http.StatusUnprocessableEntity: {}, http.StatusInternalServerError: {}}
globalProtectionChecker ProtectionChecker
)

const (
maxInterval = 60 * 60 * 24 * time.Second
minInterval = 0 * time.Second
defaultRestartProtectInterval = 120 * time.Second
defaultReadinessCheckInterval = 5 * time.Second // the null instance protection should start again when server is unready
)

func Init() {
Expand All @@ -50,24 +53,52 @@ func Init() {
log.Info(fmt.Sprintf("instance_null_protect.enable: %t", enableInstanceNullProtect))
log.Info(fmt.Sprintf("instance_null_protect.restart_protect_interval: %d", restartProtectInterval))
log.Info(fmt.Sprintf("instance_null_protect.http_status: %d", RestartProtectHttpCode))
startupTimestamp = time.Now().UnixNano()
isWithinProtection = true

StartProtectionAndStopDelayed()
gopool.Go(watch)

}

func IsWithinRestartProtection() bool {
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(defaultReadinessCheckInterval):
err := registry.Readiness(ctx)
if err != nil {
AlwaysProtection()
continue
}
StartProtectionAndStopDelayed()
}
}
}

func ShouldProtectOnNullInstance() bool {
if !enableInstanceNullProtect {
return false
}

if !isWithinProtection {
return false
if globalProtectionChecker == nil { // protect by default
return true
}

if time.Now().Add(-restartProtectInterval).UnixNano() > startupTimestamp {
log.Info("restart protection stop")
isWithinProtection = false
return false
return GetGlobalProtectionChecker().CheckProtection()
}

func StartProtectionAndStopDelayed() {
if _, ok := globalProtectionChecker.(*DelayedStopProtectChecker); ok {
return
}
globalProtectionChecker = NewDelayedSuccessChecker(restartProtectInterval)
log.Info("start protection and stop delayed on null instance")
}

func AlwaysProtection() {
if globalProtectionChecker == nil {
return
}
log.Info("within restart protection")
return true
globalProtectionChecker = nil
log.Info("always protect on null instance")
}
23 changes: 12 additions & 11 deletions pkg/protect/protect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ func TestIsWithinRestartProtection(t *testing.T) {

// protection switch off
enableInstanceNullProtect = false
assert.False(t, IsWithinRestartProtection())
// within protection
assert.False(t, ShouldProtectOnNullInstance())

enableInstanceNullProtect = true
isWithinProtection = true
startupTimestamp = time.Now().Add(-1 * time.Minute).UnixNano()
assert.True(t, IsWithinRestartProtection())
AlwaysProtection()
assert.True(t, ShouldProtectOnNullInstance())

// protection delay exceed
enableInstanceNullProtect = true
isWithinProtection = true
startupTimestamp = time.Now().Add(-2 * time.Minute).Unix()
assert.False(t, IsWithinRestartProtection())
restartProtectInterval = 1 * time.Second
StartProtectionAndStopDelayed()
assert.True(t, ShouldProtectOnNullInstance())
time.Sleep(restartProtectInterval)
assert.False(t, ShouldProtectOnNullInstance())

// always false after exceed
assert.False(t, IsWithinRestartProtection())
// only the first takes effects
StartProtectionAndStopDelayed()
assert.False(t, ShouldProtectOnNullInstance())
}
1 change: 1 addition & 0 deletions server/alarm/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

const (
IDBackendConnectionRefuse model.ID = "BackendConnectionRefuse"
IDScSelfHeartbeatFailed model.ID = "ScSelfHeartbeatFailed"
IDInternalError model.ID = "InternalError"
IDIncrementPullError model.ID = "IncrementPullError"
IDWebsocketOfScSyncerLost model.ID = "WebsocketOfScSyncerLost"
Expand Down
23 changes: 21 additions & 2 deletions server/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,29 @@ import (
"github.com/apache/servicecomb-service-center/server/alarm"
)

var healthChecker Checker = &DefaultHealthChecker{}
var healthChecker Checker = &NullChecker{}
var readinessChecker Checker = &DefaultHealthChecker{}

type Checker interface {
Healthy() error
}

type NullChecker struct {
}

func (n NullChecker) Healthy() error {
return nil
}

type DefaultHealthChecker struct {
}

func (hc *DefaultHealthChecker) Healthy() error {
for _, a := range alarm.ListAll() {
if a.ID == alarm.IDBackendConnectionRefuse && a.Status != alarm.Cleared {
if a.Status == alarm.Cleared {
continue
}
if a.ID == alarm.IDBackendConnectionRefuse || a.ID == alarm.IDScSelfHeartbeatFailed {
return errors.New(a.FieldString(alarm.FieldAdditionalContext))
}
}
Expand All @@ -48,3 +59,11 @@ func SetGlobalHealthChecker(hc Checker) {
func GlobalHealthChecker() Checker {
return healthChecker
}

func SetGlobalReadinessChecker(hc Checker) {
readinessChecker = hc
}

func GlobalReadinessChecker() Checker {
return readinessChecker
}
2 changes: 1 addition & 1 deletion server/interceptor/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func mockFunc(w http.ResponseWriter, r *http.Request) error {
case 1:
return errors.New("error")
case 0:
panic(errors.New("panic"))
return errors.New("panic")
default:
i++
}
Expand Down
3 changes: 1 addition & 2 deletions server/plugin/auth/buildin/buildin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"testing"
"time"

"github.com/form3tech-oss/jwt-go"
"github.com/patrickmn/go-cache"

_ "github.com/apache/servicecomb-service-center/test"
Expand Down Expand Up @@ -187,7 +186,7 @@ func TestSetTokenToCache(t *testing.T) {
rawToken1 := "**1"
rawToken2 := "**2"
deleta, _ := time.ParseDuration("10m")
claims := &jwt.MapClaims{"exp": float64(time.Now().Add(deleta).Unix())}
claims := map[string]interface{}{"exp": float64(time.Now().Add(deleta).Unix())}
t.Run("test Cache", func(t *testing.T) {
buildin.SetTokenToCache(tokenCache1, rawToken1, claims)
buildin.SetTokenToCache(tokenCache1, rawToken2, errors.New("bad token"))
Expand Down
9 changes: 4 additions & 5 deletions server/resource/disco/instance_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package disco

import (
"fmt"
"github.com/apache/servicecomb-service-center/pkg/protect"
"io"
"net/http"
"strings"

"github.com/go-chassis/go-chassis/v2/pkg/codec"

pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/go-chassis/v2/pkg/codec"

"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/protect"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
Expand Down Expand Up @@ -169,7 +168,7 @@ func (s *InstanceResource) FindInstances(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
w.WriteHeader(protect.RestartProtectHttpCode)
return
}
Expand Down Expand Up @@ -272,7 +271,7 @@ func (s *InstanceResource) ListInstance(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
if len(resp.Instances) == 0 && protect.IsWithinRestartProtection() {
if len(resp.Instances) == 0 && protect.ShouldProtectOnNullInstance() {
w.WriteHeader(protect.RestartProtectHttpCode)
return
}
Expand Down
12 changes: 12 additions & 0 deletions server/rest/controller/v4/main_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net/http"
"sync"

"github.com/apache/servicecomb-service-center/server/service/registry"

discosvc "github.com/apache/servicecomb-service-center/server/service/disco"

"github.com/apache/servicecomb-service-center/pkg/rest"
Expand All @@ -48,6 +50,7 @@ func (s *MainService) URLPatterns() []rest.Route {
return []rest.Route{
{Method: http.MethodGet, Path: "/v4/:project/registry/version", Func: s.GetVersion},
{Method: http.MethodGet, Path: "/v4/:project/registry/health", Func: s.ClusterHealth},
{Method: http.MethodGet, Path: "/v4/:project/registry/health/readiness", Func: s.Readiness},
}
}

Expand All @@ -60,6 +63,15 @@ func (s *MainService) ClusterHealth(w http.ResponseWriter, r *http.Request) {
rest.WriteResponse(w, r, nil, resp)
}

func (s *MainService) Readiness(w http.ResponseWriter, r *http.Request) {
err := registry.Readiness(r.Context())
if err != nil {
rest.WriteServiceError(w, err)
return
}
rest.WriteResponse(w, r, nil, nil)
}

func (s *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
parseVersionOnce.Do(func() {
result := Result{
Expand Down
1 change: 1 addition & 0 deletions server/service/rbac/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func add2WhiteAPIList() {
rbac.Add2WhiteAPIList(APITokenGranter)
rbac.Add2WhiteAPIList("/v4/:project/registry/version", "/version")
rbac.Add2WhiteAPIList("/v4/:project/registry/health", "/health")
rbac.Add2WhiteAPIList("/v4/:project/registry/health/readiness")

// user can list self permission without account get permission
Add2CheckPermWhiteAPIList(APISelfPerms)
Expand Down
Loading

0 comments on commit 4a07d7e

Please sign in to comment.