Skip to content

Commit

Permalink
region_cache: allow pd follower handle region api (#1072)
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB authored Dec 20, 2023
1 parent 85ca0a4 commit 1a33aca
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
25 changes: 19 additions & 6 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,16 +1117,28 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
r = c.searchCachedRegion(key, isEndKey)
if r == nil {
// load region when it is not exists or expired.
lr, err := c.loadRegion(bo, key, isEndKey)
lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle())
if err != nil {
// no region data, return error if failure.
return nil, err
}
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true, true)
stale := !c.insertRegionToCache(r, true, true)
c.mu.Unlock()
// just retry once, it won't bring much overhead.
if stale {
lr, err = c.loadRegion(bo, key, isEndKey)
if err != nil {
// no region data, return error if failure.
return nil, err
}
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true, true)
c.mu.Unlock()
}
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
lr, err := c.loadRegion(bo, key, isEndKey)
Expand Down Expand Up @@ -1673,7 +1685,7 @@ func filterUnavailablePeers(region *pd.Region) {
// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) (*Region, error) {
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...pd.GetRegionOption) (*Region, error) {
ctx := bo.GetCtx()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context()))
Expand All @@ -1683,6 +1695,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)

var backoffErr error
searchPrev := false
opts = append(opts, pd.WithBuckets())
for {
if backoffErr != nil {
err := bo.Backoff(retry.BoPDRPC, backoffErr)
Expand All @@ -1694,9 +1707,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
var reg *pd.Region
var err error
if searchPrev {
reg, err = c.pdClient.GetPrevRegion(ctx, key, pd.WithBuckets())
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
} else {
reg, err = c.pdClient.GetRegion(ctx, key, pd.WithBuckets())
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
}
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
if err != nil {
Expand Down Expand Up @@ -1848,7 +1861,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
}
}
start := time.Now()
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit)
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, pd.WithAllowFollowerHandle())
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
if err != nil {
if apicodec.IsDecodeError(err) {
Expand Down
34 changes: 34 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,40 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
s.Equal([]byte("b"), r.meta.EndKey)
}

func (s *testRegionCacheWithDelaySuite) TestFollowerGetStaleRegion() {
var delay uatomic.Bool
pdCli3 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
followerDelayCache := NewRegionCache(pdCli3)

delay.Store(true)
var wg sync.WaitGroup
wg.Add(1)
var final *Region
go func() {
var err error
// followerDelayCache is empty now, so it will go follower.
final, err = followerDelayCache.findRegionByKey(s.bo, []byte("z"), false)
s.NoError(err)
wg.Done()
}()
time.Sleep(30 * time.Millisecond)
delay.Store(false)
r, err := followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
s.NoError(err)
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("z"), newPeersIDs, newPeersIDs[0])
r.invalidate(Other)
r, err = followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
s.NoError(err)
s.Equal([]byte("z"), r.meta.EndKey)

// no need to retry because
wg.Wait()
s.Equal([]byte("z"), final.meta.StartKey)

followerDelayCache.Close()
}

func generateKeyForSimulator(id int, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
Expand Down

0 comments on commit 1a33aca

Please sign in to comment.