Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add egress related metrics #306

Merged
merged 4 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/cmd-coil-egress.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,61 @@ This is the number of client pods which use the egress.
| ----------- | ----------------------------- |
| `namespace` | The egress resource namespace |
| `egress` | The egress resource name |

### `coil_egress_client_pod_info`

This is the client pod information.

| Label | Description |
| -------------------| ----------------------------- |
| `namespace` | The pod resource namespace |
| `pod` | The pod name |
| `pod_ip` | The pod's IP address |
| `interface` | The interface for the pod |
| `egress` | The egress resource name |
| `egress_namespace` | The egress resource namespace |

### `coil_egress_nf_conntrack_entries_limit`

This is the limit of conntrack entries in the kernel.
This value is from `/proc/sys/net/netfilter/nf_conntrack_max`.

| Label | Description |
| ----------- | ----------------------------- |
| `namespace` | The egress resource namespace |
| `egress` | The egress resource name |
| `pod` | The pod name |


### `coil_egress_nf_conntrack_entries`

This is the number of conntrack entries in the kernel.
This value is from `/proc/sys/net/netfilter/nf_conntrack_count`.

| Label | Description |
| ----------- | ----------------------------- |
| `namespace` | The egress resource namespace |
| `egress` | The egress resource name |
| `pod` | The pod name |

### `coil_egress_nftables_masqueraded_packets_total`

This is the total number of packets masqueraded by iptables in a egress NAT pod.
This value is from the result of `iptables -t nat -L POSTROUTING -vn`.

| Label | Description |
| ----------- | ----------------------------- |
| `namespace` | The egress resource namespace |
| `egress` | The egress resource name |
| `pod` | The pod name |

### `coil_egress_nftables_masqueraded_bytes_total`

This is the total bytes of masqueraded packets by iptables in a egress NAT pod.
This value is from the result of `iptables -t nat -L POSTROUTING -vn`.

| Label | Description |
| ----------- | ----------------------------- |
| `namespace` | The egress resource namespace |
| `egress` | The egress resource name |
| `pod` | The pod name |
17 changes: 17 additions & 0 deletions v2/cmd/coil-egress/sub/run.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sub

import (
"context"
"errors"
"net"
"os"
Expand All @@ -11,12 +12,14 @@ import (
"github.com/cybozu-go/coil/v2/controllers"
"github.com/cybozu-go/coil/v2/pkg/constants"
"github.com/cybozu-go/coil/v2/pkg/founat"
egressMetrics "github.com/cybozu-go/coil/v2/pkg/metrics"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

Expand All @@ -33,6 +36,11 @@ func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

// +kubebuilder:scaffold:scheme

metrics.Registry.MustRegister(egressMetrics.NfConnctackCount)
metrics.Registry.MustRegister(egressMetrics.NfConnctackLimit)
metrics.Registry.MustRegister(egressMetrics.NfTableMasqueradeBytes)
metrics.Registry.MustRegister(egressMetrics.NfTableMasqueradePackets)
}

func subMain() error {
Expand Down Expand Up @@ -105,6 +113,15 @@ func subMain() error {
return err
}

setupLog.Info("setup egress metrics collector")
runner := egressMetrics.NewRunner()
egressCollector, err := egressMetrics.NewEgressCollector(myNS, os.Getenv("HOSTNAME"), myName)
if err != nil {
return err
}
runner.Register(egressCollector)
go runner.Run(context.Background())

setupLog.Info("starting manager", "version", v2.Version())
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
15 changes: 14 additions & 1 deletion v2/controllers/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *mockFoUTunnel) AddPeer(ip net.IP, sportAuto bool) (netlink.Link, error)
defer t.mu.Unlock()

t.peers[ip.String()] = sportAuto
return nil, nil
return &mockLink{name: ip.String()}, nil
}

func (t *mockFoUTunnel) DelPeer(ip net.IP) error {
Expand Down Expand Up @@ -211,3 +211,16 @@ func (e *mockEgress) GetClients() map[string]bool {
}
return m
}

// mockLink implements netlink.Link interface
type mockLink struct {
name string
}

func (m *mockLink) Attrs() *netlink.LinkAttrs {
return &netlink.LinkAttrs{Name: m.name}
}

func (m *mockLink) Type() string {
return "mock-link"
}
44 changes: 34 additions & 10 deletions v2/controllers/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

var (
clientPods = prometheus.NewGaugeVec(
ClientPods = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: constants.MetricsNS,
Subsystem: "egress",
Expand All @@ -34,17 +34,29 @@ var (
},
[]string{"namespace", "egress"},
)

ClientPodInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: constants.MetricsNS,
Subsystem: "egress",
Name: "client_pod_info",
Help: "information of a client pod which uses this egress",
},
[]string{"namespace", "pod", "pod_ip", "interface", "egress", "egress_namespace"},
)
)

func init() {
metrics.Registry.MustRegister(clientPods)
metrics.Registry.MustRegister(ClientPods)
metrics.Registry.MustRegister(ClientPodInfo)
}

// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch

// SetupPodWatcher registers pod watching reconciler to mgr.
func SetupPodWatcher(mgr ctrl.Manager, ns, name string, ft founat.FoUTunnel, encapSportAuto bool, eg founat.Egress, cfg *rest.Config) error {
clientPods.Reset()
ClientPods.Reset()
ClientPodInfo.Reset()

r := &podWatcher{
client: mgr.GetClient(),
Expand All @@ -53,7 +65,7 @@ func SetupPodWatcher(mgr ctrl.Manager, ns, name string, ft founat.FoUTunnel, enc
ft: ft,
encapSportAuto: encapSportAuto,
eg: eg,
metric: clientPods.WithLabelValues(ns, name),
clientPods: ClientPods.WithLabelValues(ns, name),
podAddrs: make(map[string][]net.IP),
peers: make(map[string]map[string]struct{}),
}
Expand Down Expand Up @@ -107,7 +119,7 @@ type podWatcher struct {
ft founat.FoUTunnel
encapSportAuto bool
eg founat.Egress
metric prometheus.Gauge
clientPods prometheus.Gauge

mu sync.Mutex
podAddrs map[string][]net.IP
Expand Down Expand Up @@ -216,6 +228,8 @@ OUTER:
if err := r.eg.AddClient(ip, link); err != nil {
return err
}
metric := ClientPodInfo.WithLabelValues(pod.GetNamespace(), pod.GetName(), ip.String(), link.Attrs().Name, r.myName, r.myNS)
metric.Set(1)
}

OUTER2:
Expand All @@ -229,6 +243,9 @@ OUTER2:
if err := r.ft.DelPeer(eip); err != nil {
return err
}
if n := ClientPodInfo.DeletePartialMatch(prometheus.Labels{"namespace": pod.GetNamespace(), "pod": pod.GetName(), "pod_ip": eip.String(), "egress": r.myName, "egress_namespace": r.myNS}); n != 1 {
logger.Error(errors.New("metrics deletion error"), "the number of deleted metrics is not one for", "pod", pod.GetName(), "namespace", pod.GetNamespace())
}
}

r.podAddrs[key] = podIPs
Expand All @@ -241,25 +258,29 @@ OUTER2:
keySet[key] = struct{}{}
}

r.metric.Set(float64(len(r.podAddrs)))
r.clientPods.Set(float64(len(r.podAddrs)))
return nil
}

func (r *podWatcher) delTerminatedPod(pod *corev1.Pod, logger logr.Logger) error {
r.mu.Lock()
defer r.mu.Unlock()

return r.delPeer(pod.Namespace+"/"+pod.Name, "delTerminatedPod", string(pod.Status.Phase), logger)
return r.delPeer(types.NamespacedName{Namespace: pod.GetNamespace(), Name: pod.GetName()}, "delTerminatedPod", string(pod.Status.Phase), logger)
}

func (r *podWatcher) delPod(n types.NamespacedName, logger logr.Logger) error {
r.mu.Lock()
defer r.mu.Unlock()

return r.delPeer(n.Namespace+"/"+n.Name, "delPod", "", logger)
if err := r.delPeer(n, "delPod", "", logger); err != nil {
return err
}
return nil
}

func (r *podWatcher) delPeer(key, caller, podPhase string, logger logr.Logger) error {
func (r *podWatcher) delPeer(n types.NamespacedName, caller, podPhase string, logger logr.Logger) error {
key := n.Namespace + "/" + n.Name
for _, ip := range r.podAddrs[key] {
existsLivePeers, err := r.existsOtherLivePeers(key, ip.String())
if err != nil {
Expand All @@ -278,10 +299,13 @@ func (r *podWatcher) delPeer(key, caller, podPhase string, logger logr.Logger) e
delete(r.peers, ip.String())
}
}
if deleted := ClientPodInfo.DeletePartialMatch(prometheus.Labels{"namespace": n.Namespace, "pod": n.Name, "pod_ip": ip.String(), "egress": r.myName, "egress_namespace": r.myNS}); deleted != 1 {
logger.Error(errors.New("metrics deletion error"), "the number of deleted metrics is not one for", "pod", n.Name, "namespace", n.Namespace)
}
}

delete(r.podAddrs, key)
r.metric.Set(float64(len(r.podAddrs)))
r.clientPods.Set(float64(len(r.podAddrs)))
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
github.com/google/go-cmp v0.6.0
github.com/google/nftables v0.2.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/onsi/ginkgo/v2 v2.19.0
Expand Down Expand Up @@ -56,9 +57,12 @@ require (
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
github.com/mdlayher/socket v0.5.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
8 changes: 8 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/nftables v0.2.0 h1:PbJwaBmbVLzpeldoeUKGkE2RjstrjPKMl6oLrfEJ6/8=
github.com/google/nftables v0.2.0/go.mod h1:Beg6V6zZ3oEn0JuiUQ4wqwuyqqzasOltcoXPtgLbFp4=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
Expand All @@ -70,6 +72,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand All @@ -85,6 +89,10 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
github.com/mdlayher/socket v0.5.0 h1:ilICZmJcQz70vrWVes1MFera4jGiWNocSkykwwoy3XI=
github.com/mdlayher/socket v0.5.0/go.mod h1:WkcBFfvyG8QENs5+hfQPl1X6Jpd2yeLIYgrGFmJiJxI=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
55 changes: 55 additions & 0 deletions v2/pkg/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package metrics

import (
"context"
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"
)

type Collector interface {
Update() error
Name() string
}

type Runner struct {
collectors []Collector
interval time.Duration
}

func NewRunner() *Runner {
return &Runner{
collectors: []Collector{},
interval: time.Second * 30,
}
}

func (r *Runner) Register(collector Collector) {
r.collectors = append(r.collectors, collector)
}

func (r *Runner) Run(ctx context.Context) {
ticker := time.NewTicker(r.interval)

for {
<-ticker.C
r.collect(ctx)
}
}

func (r *Runner) collect(ctx context.Context) {
logger := log.FromContext(ctx)
wg := sync.WaitGroup{}
wg.Add(len(r.collectors))
for _, c := range r.collectors {
go func(c Collector) {
if err := c.Update(); err != nil {
logger.Error(err, "failed to collect metrics", "name", c.Name())
}
wg.Done()
}(c)
}

wg.Wait()
}
Loading
Loading