Skip to content

Commit

Permalink
1.GetPortListenAddr获取内网ip时没有及时close导致 fd会泄露
Browse files Browse the repository at this point in the history
2. GetPortListenAddr改为ticker运行前获取一次传入
  • Loading branch information
ning1875 committed Jan 20, 2021
1 parent d83ab59 commit 81cbd53
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion collect/kube_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func DoKubeProxyOnNodeCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) {

start := time.Now()
kubeproxyAddr, err := getPortListenAddr(cg.KubeProxyC.Port)
kubeproxyAddr, err := GetPortListenAddr(cg.KubeProxyC.Port)

if kubeproxyAddr == "" {
level.Warn(logger).Log("msg", "getPortListenAddrEmptykubeProxyAddr", "funcName", funcName, "err:", err, "port", cg.KubeProxyC.Port)
Expand Down
31 changes: 16 additions & 15 deletions collect/kubelet_cadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"
)

func getPortListenAddr(port int64) (portListenAddr string, err error) {
func GetPortListenAddr(port int64) (portListenAddr string, err error) {
addrs, err := net.InterfaceAddrs()

if err != nil {
Expand All @@ -36,11 +36,12 @@ func getPortListenAddr(port int64) (portListenAddr string, err error) {
addrAndPort := fmt.Sprintf("%s:%d", adds, port)
conn, err := net.DialTimeout("tcp", addrAndPort, time.Second*1)

if err == nil && conn != nil {
conn.Close()
portListenAddr = adds
break
if err != nil {
continue
}
conn.Close()
portListenAddr = adds
break

}
return
Expand All @@ -50,16 +51,16 @@ func DoKubeletCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap,
// 通过kubelet prometheus 接口拿到数据后做ETL
// 根据docker inspect 接口拿到所有容器的数据,根据podName一致找到pause 容器的label 给对应的pod数据
start := time.Now()
kubeletAddr, err := getPortListenAddr(cg.KubeletC.Port)

if kubeletAddr == "" {
level.Warn(logger).Log("msg", "getPortListenAddrEmptyKubeletAddr", "err:", err, "port", cg.KubeletC.Port)

} else {

cg.KubeletC.Addr = fmt.Sprintf("%s://%s:%d/%s", cg.KubeletC.Scheme, kubeletAddr, cg.KubeletC.Port, cg.KubeletC.MetricsPath)
level.Info(logger).Log("msg", "getPortListenAddrForKubeletAddr", "port", cg.KubeletC.Port, "ipaddr", kubeletAddr, "kubeletPath", cg.KubeletC.Addr)
}
//kubeletAddr, err := GetPortListenAddr(cg.KubeletC.Port)
//
//if kubeletAddr == "" {
// level.Warn(logger).Log("msg", "getPortListenAddrEmptyKubeletAddr", "err:", err, "port", cg.KubeletC.Port)
//
//} else {
//
// cg.KubeletC.Addr = fmt.Sprintf("%s://%s:%d/%s", cg.KubeletC.Scheme, kubeletAddr, cg.KubeletC.Port, cg.KubeletC.MetricsPath)
// level.Info(logger).Log("msg", "getPortListenAddrForKubeletAddr", "port", cg.KubeletC.Port, "ipaddr", kubeletAddr, "kubeletPath", cg.KubeletC.Addr)
//}
if cg.KubeletC.Addr == "" && len(cg.KubeletC.UserSpecifyAddrs) > 0 {
cg.KubeletC.Addr = cg.KubeletC.UserSpecifyAddrs[0]
}
Expand Down
2 changes: 1 addition & 1 deletion collect/kubelet_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func DoKubeletNodeOnNodeCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) {

start := time.Now()
kubeletNodeAddr, err := getPortListenAddr(cg.KubeletNodeC.Port)
kubeletNodeAddr, err := GetPortListenAddr(cg.KubeletNodeC.Port)

if kubeletNodeAddr == "" {
level.Warn(logger).Log("msg", "getPortListenAddrEmptykubeletNodeAddr", "funcName", funcName, "err:", err, "port", cg.KubeletNodeC.Port)
Expand Down
12 changes: 11 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/n9e/k8s-mon/collect"
Expand Down Expand Up @@ -109,8 +110,17 @@ func main() {
// kubelet_agent .
dataM := collect.NewHistoryMap()
g.Add(func() error {
kubeletAddr, err := collect.GetPortListenAddr(sConfig.KubeletC.Port)
if kubeletAddr == "" {
level.Warn(logger).Log("msg", "getPortListenAddrEmptyKubeletAddr", "err:", err, "port", sConfig.KubeletC.Port)

err := collect.CommonCollectTickerForWithDataM(sConfig, ctxAll, logger, dataM, collect.DoKubeletCollect, config.FUNCNAME_KUBELET)
} else {

sConfig.KubeletC.Addr = fmt.Sprintf("%s://%s:%d/%s", sConfig.KubeletC.Scheme, kubeletAddr, sConfig.KubeletC.Port, sConfig.KubeletC.MetricsPath)
level.Info(logger).Log("msg", "getPortListenAddrForKubeletAddr", "port", sConfig.KubeletC.Port, "ipaddr", kubeletAddr, "kubeletPath", sConfig.KubeletC.Addr)
}

err = collect.CommonCollectTickerForWithDataM(sConfig, ctxAll, logger, dataM, collect.DoKubeletCollect, config.FUNCNAME_KUBELET)
if err != nil {
level.Error(logger).Log("msg", "kubelet-collect-manager stopped")
}
Expand Down

0 comments on commit 81cbd53

Please sign in to comment.