diff --git a/pkg/core/store.go b/pkg/core/store.go index 9ec9a44bfc8..a7d59355084 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -666,6 +666,10 @@ func NewStoresInfo() *StoresInfo { func (s *StoresInfo) GetStore(storeID uint64) *StoreInfo { s.RLock() defer s.RUnlock() + if s.stores == nil { + log.Fatal("invalid stores") + return nil + } store, ok := s.stores[storeID] if !ok { return nil diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 44a0ae69a46..8737ff5b54a 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -30,12 +30,15 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/goleak" + "go.uber.org/zap" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -798,6 +801,23 @@ func TestRemovingProgress(t *testing.T) { output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusNotFound) re.Contains(string(output), "no progress found for the given store ID") + testutil.Eventually(re, func() bool { + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores", http.MethodGet, http.StatusOK) + var storesInfo response.StoresInfo + if err := json.Unmarshal(output, &storesInfo); err != nil { + return false + } + if len(storesInfo.Stores) != 3 { + return false + } + for _, store := range storesInfo.Stores { + if store.Store.GetNodeState() != metapb.NodeState_Serving { + return false + } + } + return true + }) + // remove store 1 and store 2 _ = sendRequest(re, leader.GetAddr()+"/pd/api/v1/store/1", http.MethodDelete, http.StatusOK) _ = sendRequest(re, leader.GetAddr()+"/pd/api/v1/store/2", http.MethodDelete, http.StatusOK) @@ -838,7 +858,21 @@ func TestRemovingProgress(t *testing.T) { }) } + now := time.Now() testutil.Eventually(re, func() bool { + defer func() { + if time.Since(now) > 19*time.Second { + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=1", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + log.Info("store 1 progress", zap.Any("progress", p)) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + log.Info("store 2 progress", zap.Any("progress", p)) + } + }() + if leader.GetRaftCluster() == nil { + return false + } // wait for cluster prepare if !leader.GetRaftCluster().IsPrepared() { leader.GetRaftCluster().SetPrepared() @@ -850,30 +884,41 @@ func TestRemovingProgress(t *testing.T) { re.NoError(err) defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.Info("not ready", zap.String("url", url), zap.Int("status", resp.StatusCode)) return false } output, err := io.ReadAll(resp.Body) re.NoError(err) re.NoError(json.Unmarshal(output, &p)) if p.Action != "removing" { + log.Info("not removing", zap.String("action", p.Action)) return false } // store 1: (60-20)/(60+50) ~= 0.36 // store 2: (30-10)/(30+40) ~= 0.28 // average progress ~= (0.36+0.28)/2 = 0.32 if fmt.Sprintf("%.2f", p.Progress) != "0.32" { + log.Info("progress not match", zap.Float64("progress", p.Progress)) return false } // store 1: 40/10s = 4 // store 2: 20/10s = 2 - // average speed = (2+4)/2 = 33 + // average speed = (2+4)/2 = 3.0 if p.CurrentSpeed != 3.0 { + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=1", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + log.Info("store 1 progress", zap.Any("progress", p)) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + log.Info("store 2 progress", zap.Any("progress", p)) + log.Info("speed not match", zap.Float64("speed", p.CurrentSpeed)) return false } // store 1: (20+50)/4 = 17.5s // store 2: (10+40)/2 = 25s // average time = (17.5+25)/2 = 21.25s if p.LeftSeconds != 21.25 { + log.Info("time not match", zap.Float64("time", p.LeftSeconds)) return false } return true @@ -1097,15 +1142,22 @@ func sendRequest(re *require.Assertions, url string, method string, statusCode i testutil.Eventually(re, func() bool { resp, err := tests.TestDialClient.Do(req) - re.NoError(err) + if err != nil { + log.Info("send request failed", zap.Error(err)) + return false + } defer resp.Body.Close() // Due to service unavailability caused by environmental issues, // we will retry it. if resp.StatusCode == http.StatusServiceUnavailable { + log.Info("service unavailable", zap.String("url", url)) + return false + } + if resp.StatusCode != statusCode { + log.Info("status code not match", zap.String("url", url), zap.Int("status", resp.StatusCode)) return false } - re.Equal(statusCode, resp.StatusCode) output, err = io.ReadAll(resp.Body) re.NoError(err) return true diff --git a/tests/testutil.go b/tests/testutil.go index 406e09345b9..d499472f83f 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -211,7 +211,10 @@ func MustPutStore(re *require.Assertions, cluster *TestCluster, store *metapb.St if ts == 0 { ts = time.Now().UnixNano() } - storeInfo := grpcServer.GetRaftCluster().GetStore(store.GetId()) + raftCluster := grpcServer.GetRaftCluster() + id := store.GetId() + storeInfo := raftCluster.GetStore(id) + re.NotNil(storeInfo) newStore := storeInfo.Clone( core.SetStoreStats(&pdpb.StoreStats{ Capacity: uint64(10 * units.GiB), @@ -220,7 +223,7 @@ func MustPutStore(re *require.Assertions, cluster *TestCluster, store *metapb.St }), core.SetLastHeartbeatTS(time.Unix(ts/1e9, ts%1e9)), ) - grpcServer.GetRaftCluster().GetBasicCluster().PutStore(newStore) + raftCluster.GetBasicCluster().PutStore(newStore) if cluster.GetSchedulingPrimaryServer() != nil { cluster.GetSchedulingPrimaryServer().GetCluster().PutStore(newStore) }