diff --git a/collect/apiserver.go b/collect/apiserver.go index 666aea9..ddc9ae4 100644 --- a/collect/apiserver.go +++ b/collect/apiserver.go @@ -2,6 +2,7 @@ package collect import ( "github.com/go-kit/kit/log" + "github.com/n9e/k8s-mon/config" ) diff --git a/collect/cadvisor.go b/collect/cadvisor.go index f79e5d1..5d1d60b 100644 --- a/collect/cadvisor.go +++ b/collect/cadvisor.go @@ -3,9 +3,6 @@ package collect import ( "encoding/json" "fmt" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/spf13/viper" "net" "net/http" "path" @@ -15,8 +12,11 @@ import ( "time" "github.com/docker/docker/api/types" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/google/cadvisor/client" v1 "github.com/google/cadvisor/info/v1" + "github.com/spf13/viper" "github.com/toolkits/pkg/file" "github.com/toolkits/pkg/logger" diff --git a/collect/common.go b/collect/common.go index 86f2a50..1f64828 100644 --- a/collect/common.go +++ b/collect/common.go @@ -4,18 +4,67 @@ import ( "context" "crypto/md5" "fmt" - "github.com/didi/nightingale/src/common/dataobj" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/n9e/k8s-mon/config" - config_util "github.com/prometheus/common/config" "io" "io/ioutil" + "net" "net/http" "net/url" + "os" "time" + + "github.com/didi/nightingale/src/common/dataobj" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + config_util "github.com/prometheus/common/config" + + "github.com/n9e/k8s-mon/config" ) +func GetHostName(logger log.Logger) string { + name, err := os.Hostname() + if err != nil { + level.Error(logger).Log("msg", "GetHostNameError", "err", err) + } + return name + +} + +func GetPortListenAddr(port int64) (portListenAddr string, err error) { + addrs, err := net.InterfaceAddrs() + + if err != nil { + return "", err + } + for _, address := range addrs { + + ipnet, ok := address.(*net.IPNet) + if !ok { + continue + } + + addr := ipnet.IP.To4() + if addr == nil { + continue + } + // 检查ip地址判断是否回环地址 + if ipnet.IP.IsLoopback() { + continue + } + adds := addr.String() + addrAndPort := fmt.Sprintf("%s:%d", adds, port) + conn, err := net.DialTimeout("tcp", addrAndPort, time.Second*1) + + if err != nil { + continue + } + conn.Close() + portListenAddr = adds + break + + } + return +} + func CommonCollectTickerForWithDataM(cg *config.Config, ctx context.Context, logger log.Logger, dataMap *HistoryMap, collectFunc func(*config.Config, log.Logger, *HistoryMap, string), funcName string) error { ticker := time.NewTicker(time.Second * (time.Duration(cg.Step))) level.Info(logger).Log("msg", "CommonCollectTickerForWithDataM start....", "funcName", funcName) diff --git a/collect/const.go b/collect/const.go deleted file mode 100644 index 3d4ede7..0000000 --- a/collect/const.go +++ /dev/null @@ -1,3 +0,0 @@ -package collect - -const () diff --git a/collect/coredns.go b/collect/coredns.go index 0720553..c56fb4b 100644 --- a/collect/coredns.go +++ b/collect/coredns.go @@ -2,6 +2,7 @@ package collect import ( "github.com/go-kit/kit/log" + "github.com/n9e/k8s-mon/config" ) diff --git a/collect/get_node.go b/collect/get_node.go index 991a746..506fa5a 100644 --- a/collect/get_node.go +++ b/collect/get_node.go @@ -2,14 +2,16 @@ package collect import ( "context" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - kconfig "github.com/n9e/k8s-mon/config" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "time" + + kconfig "github.com/n9e/k8s-mon/config" ) func GetServerAddrByGetNode(logger log.Logger, dataMap *HistoryMap) { diff --git a/collect/get_pod.go b/collect/get_pod.go index 9590488..4fdd439 100644 --- a/collect/get_pod.go +++ b/collect/get_pod.go @@ -2,13 +2,15 @@ package collect import ( "context" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - kconfig "github.com/n9e/k8s-mon/config" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "time" + + kconfig "github.com/n9e/k8s-mon/config" ) func GetServerAddrAll(logger log.Logger, dataMap *HistoryMap) { diff --git a/collect/kube_controller_manager.go b/collect/kube_controller_manager.go index 30a80fb..e0fc864 100644 --- a/collect/kube_controller_manager.go +++ b/collect/kube_controller_manager.go @@ -2,6 +2,7 @@ package collect import ( "github.com/go-kit/kit/log" + "github.com/n9e/k8s-mon/config" ) diff --git a/collect/kube_proxy.go b/collect/kube_proxy.go index 6e8f667..61e9086 100644 --- a/collect/kube_proxy.go +++ b/collect/kube_proxy.go @@ -2,11 +2,13 @@ package collect import ( "fmt" + "time" + "github.com/didi/nightingale/src/common/dataobj" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/n9e/k8s-mon/config" - "time" ) func DoKubeProxyOnNodeCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) { diff --git a/collect/kube_scheduler.go b/collect/kube_scheduler.go index 7d1e0e9..5513132 100644 --- a/collect/kube_scheduler.go +++ b/collect/kube_scheduler.go @@ -2,6 +2,7 @@ package collect import ( "github.com/go-kit/kit/log" + "github.com/n9e/k8s-mon/config" ) diff --git a/collect/kube_state_metrics.go b/collect/kube_state_metrics.go index 4a5c492..060f9d5 100644 --- a/collect/kube_state_metrics.go +++ b/collect/kube_state_metrics.go @@ -2,12 +2,14 @@ package collect import ( "fmt" + "strings" + "time" + "github.com/didi/nightingale/src/common/dataobj" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/n9e/k8s-mon/config" - "strings" - "time" ) func DoKubeStatsMetricsCollect(cg *config.Config, logger log.Logger, funcName string) { @@ -160,6 +162,10 @@ func DoKubeStatsMetricsCollect(cg *config.Config, logger log.Logger, funcName st if nid, loaded := nidM[kk]; loaded { metric.Nid = nid } + if metric.Nid == "" { + metric.Nid = cg.ServerSideNid + } + if _, loaded := tagWhiteM[k]; !loaded { delete(metric.TagsMap, k) } diff --git a/collect/kubelet_cadvisor.go b/collect/kubelet_cadvisor.go index dd200a0..3377ea6 100644 --- a/collect/kubelet_cadvisor.go +++ b/collect/kubelet_cadvisor.go @@ -1,66 +1,20 @@ package collect import ( - "fmt" + "strings" + "time" + "github.com/didi/nightingale/src/common/dataobj" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/n9e/k8s-mon/config" - "net" - "strings" - "time" ) -func GetPortListenAddr(port int64) (portListenAddr string, err error) { - addrs, err := net.InterfaceAddrs() - - if err != nil { - return "", err - } - for _, address := range addrs { - - ipnet, ok := address.(*net.IPNet) - if !ok { - continue - } - - addr := ipnet.IP.To4() - if addr == nil { - continue - } - // 检查ip地址判断是否回环地址 - if ipnet.IP.IsLoopback() { - continue - } - adds := addr.String() - addrAndPort := fmt.Sprintf("%s:%d", adds, port) - conn, err := net.DialTimeout("tcp", addrAndPort, time.Second*1) - - if err != nil { - continue - } - conn.Close() - portListenAddr = adds - break - - } - return -} - func DoKubeletCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) { // 通过kubelet prometheus 接口拿到数据后做ETL // 根据docker inspect 接口拿到所有容器的数据,根据podName一致找到pause 容器的label 给对应的pod数据 start := time.Now() - //kubeletAddr, err := GetPortListenAddr(cg.KubeletC.Port) - // - //if kubeletAddr == "" { - // level.Warn(logger).Log("msg", "getPortListenAddrEmptyKubeletAddr", "err:", err, "port", cg.KubeletC.Port) - // - //} else { - // - // cg.KubeletC.Addr = fmt.Sprintf("%s://%s:%d/%s", cg.KubeletC.Scheme, kubeletAddr, cg.KubeletC.Port, cg.KubeletC.MetricsPath) - // level.Info(logger).Log("msg", "getPortListenAddrForKubeletAddr", "port", cg.KubeletC.Port, "ipaddr", kubeletAddr, "kubeletPath", cg.KubeletC.Addr) - //} if cg.KubeletC.Addr == "" && len(cg.KubeletC.UserSpecifyAddrs) > 0 { cg.KubeletC.Addr = cg.KubeletC.UserSpecifyAddrs[0] } @@ -207,6 +161,13 @@ func DoKubeletCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, delete(metric.TagsMap, k) } } + // 添加个node_ip label + if cg.KubeletC.HostIp != "" { + metric.TagsMap["node_ip"] = cg.KubeletC.HostIp + } + if cg.KubeletC.HostName != "" { + metric.TagsMap["node_name"] = cg.KubeletC.HostName + } // tags string metric.Tags = makeAppendTags(metric.TagsMap, cg.AppendTags) diff --git a/collect/kubelet_node.go b/collect/kubelet_node.go index 6d299d0..e335a86 100644 --- a/collect/kubelet_node.go +++ b/collect/kubelet_node.go @@ -2,11 +2,13 @@ package collect import ( "fmt" + "time" + "github.com/didi/nightingale/src/common/dataobj" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/n9e/k8s-mon/config" - "time" ) func DoKubeletNodeOnNodeCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) { diff --git a/collect/parse_test.go b/collect/parse_test.go index 41b4720..112e20c 100644 --- a/collect/parse_test.go +++ b/collect/parse_test.go @@ -2,11 +2,12 @@ package collect import ( "encoding/json" - "github.com/prometheus/common/promlog" "io/ioutil" "log" "os" "testing" + + "github.com/prometheus/common/promlog" ) func TestParseInfValue(t *testing.T) { diff --git a/collect/parser.go b/collect/parser.go index 166ac6d..f642335 100644 --- a/collect/parser.go +++ b/collect/parser.go @@ -5,17 +5,16 @@ import ( "bytes" "errors" "fmt" - "github.com/go-kit/kit/log" "math" "strconv" "strings" "time" "github.com/didi/nightingale/src/common/dataobj" + "github.com/go-kit/kit/log" + fmodel "github.com/open-falcon/falcon-plus/common/model" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" - - fmodel "github.com/open-falcon/falcon-plus/common/model" ) func valueConv(valueUntyped interface{}) (error, float64) { diff --git a/collect/push.go b/collect/push.go index 42a1ffb..49d29d5 100644 --- a/collect/push.go +++ b/collect/push.go @@ -3,12 +3,13 @@ package collect import ( "bytes" "encoding/json" - "github.com/didi/nightingale/src/common/dataobj" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "io/ioutil" "net/http" "time" + + "github.com/didi/nightingale/src/common/dataobj" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" ) func PushWork(url string, tt int64, metricList []dataobj.MetricValue, logger log.Logger, funcName string) { diff --git a/config/config.go b/config/config.go index d654b0c..3563720 100644 --- a/config/config.go +++ b/config/config.go @@ -3,13 +3,14 @@ package config import ( "errors" "fmt" + "io/ioutil" + "net/url" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" config_util "github.com/prometheus/common/config" "github.com/spf13/viper" "gopkg.in/yaml.v2" - "io/ioutil" - "net/url" ) type Config struct { @@ -34,6 +35,8 @@ type Config struct { } type CommonApiServerConfig struct { + HostName string `yaml:"-"` + HostIp string `yaml:"-"` HashModNum uint64 `yaml:"hash_mod_num"` HashModShard uint64 `yaml:"hash_mod_shard"` ConcurrencyLimit int64 `yaml:"concurrency_limit"` diff --git a/main.go b/main.go index 93ed845..aa72f29 100644 --- a/main.go +++ b/main.go @@ -3,21 +3,22 @@ package main import ( "context" "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/n9e/k8s-mon/collect" + "github.com/oklog/run" "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/common/version" "gopkg.in/alecthomas/kingpin.v2" - "os" - "os/signal" - "path/filepath" - "syscall" - "time" + "github.com/n9e/k8s-mon/collect" "github.com/n9e/k8s-mon/config" - "github.com/oklog/run" ) func main() { @@ -110,12 +111,15 @@ func main() { // kubelet_agent . dataM := collect.NewHistoryMap() g.Add(func() error { + hostname := collect.GetHostName(logger) + sConfig.KubeletC.HostName = hostname + kubeletAddr, err := collect.GetPortListenAddr(sConfig.KubeletC.Port) if kubeletAddr == "" { level.Warn(logger).Log("msg", "getPortListenAddrEmptyKubeletAddr", "err:", err, "port", sConfig.KubeletC.Port) } else { - + sConfig.KubeletC.HostIp = kubeletAddr sConfig.KubeletC.Addr = fmt.Sprintf("%s://%s:%d/%s", sConfig.KubeletC.Scheme, kubeletAddr, sConfig.KubeletC.Port, sConfig.KubeletC.MetricsPath) level.Info(logger).Log("msg", "getPortListenAddrForKubeletAddr", "port", sConfig.KubeletC.Port, "ipaddr", kubeletAddr, "kubeletPath", sConfig.KubeletC.Addr) }