Skip to content

Commit

Permalink
etcdutil: remove stale client endpoints for healthyChecker (tikv#7227
Browse files Browse the repository at this point in the history
…) (tikv#231)

close tikv#7226

remove stale client endpoints for `healthyChecker`

Signed-off-by: lhy1024 <[email protected]>
Co-authored-by: iosmanthus <[email protected]>
  • Loading branch information
lhy1024 and iosmanthus authored Nov 20, 2023
1 parent 01dbbff commit 01c786b
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,18 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
}

func (checker *healthyChecker) update(eps []string) {
epMap := make(map[string]struct{})
for _, ep := range eps {
epMap[ep] = struct{}{}
}

for ep := range epMap {
// check if client exists, if not, create one, if exists, check if it's offline or disconnected.
if client, ok := checker.Load(ep); ok {
lastHealthy := client.(*healthyClient).lastHealth
if time.Since(lastHealthy) > etcdServerOfflineTimeout {
log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep))
checker.Delete(ep)
checker.removeClient(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
Expand All @@ -390,6 +395,16 @@ func (checker *healthyChecker) update(eps []string) {
}
checker.addClient(ep, time.Now())
}

// check if there are some stale clients, if exists, remove them.
checker.Range(func(key, value interface{}) bool {
ep := key.(string)
if _, ok := epMap[ep]; !ok {
log.Info("remove stale etcd client", zap.String("endpoint", ep))
checker.removeClient(ep)
}
return true
})
}

func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
Expand All @@ -404,6 +419,15 @@ func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
})
}

func (checker *healthyChecker) removeClient(ep string) {
if client, ok := checker.LoadAndDelete(ep); ok {
err := client.(*healthyClient).Close()
if err != nil {
log.Error("failed to close etcd healthy client", zap.Error(err))
}
}
}

func syncUrls(client *clientv3.Client) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout)
Expand Down

0 comments on commit 01c786b

Please sign in to comment.