Skip to content

Commit

Permalink
fix(sync): fix delete pod wrongly caused by informer exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
WeeNews authored and dakehero committed Jul 17, 2023
1 parent c4a17e4 commit f1c0e59
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 25 deletions.
1 change: 1 addition & 0 deletions chart/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ spec:
name: host-cilium-run
- mountPath: /var/run/netns
name: host-run-netns
mountPropagation: HostToContainer
- mountPath: /opt/cni/bin
name: host-cni-bin-dir
- mountPath: /var/log/cello
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func Execute() {
if err := MigrateLocalPodDB(); err != nil {
if err := UpdateLocalPodDB(); err != nil {
log.Fatalf("Convert pod format in persistence db before build daemon failed, %v", err)
}

Expand Down
118 changes: 94 additions & 24 deletions pkg/daemon/prestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,33 @@ import (
"fmt"
"net"

"github.com/containernetworking/plugins/pkg/ns"
"github.com/vishvananda/netlink"

"github.com/volcengine/cello/pkg/utils/netns"
"github.com/volcengine/cello/types"
)

// MigrateLocalPodDB migrate local Pod persistence DB to the current version.
func MigrateLocalPodDB() error {
// UpdateLocalPodDB migrate local Pod persistence DB to the current version.
func UpdateLocalPodDB() error {
log.Infof("Start convert pod format in persistence db")

ipNsMapping := map[string]string{}
namespaces, err := netns.ListAllNetNs()
if err != nil {
return fmt.Errorf("list all netns failed, %v", err)
}
for _, namespace := range namespaces {
ips, err := GetPodIPs(namespace)
if err != nil {
log.Error("get ips of pod %s failed, %v", namespace, err)
continue
}
for _, ip := range ips {
ipNsMapping[ip] = namespace
}
}

podPersist, err := newPodPersistenceManager(podPersistencePath, "pod")
if err != nil {
return fmt.Errorf("open persistence db failed: %v", err)
Expand All @@ -35,38 +56,87 @@ func MigrateLocalPodDB() error {
}

for _, pod := range pods {
if pod.PodNetworkMode != "" {
if pod.PodNetworkMode != "" && pod.NetNs != "" {
continue
}
resIPv4, _, err := net.ParseCIDR(pod.MainInterface.IPv4Addr)
if err != nil {
return fmt.Errorf("parse ipv4Addr %s failed, %v", pod.MainInterface.IPv4Addr, err)
}
resIPSet := types.IPSet{
IPv4: resIPv4,
IPv6: nil,
if pod.PodNetworkMode == "" {
resIPv4, _, err := net.ParseCIDR(pod.MainInterface.IPv4Addr)
if err != nil {
return fmt.Errorf("parse ipv4Addr %s failed, %v", pod.MainInterface.IPv4Addr, err)
}
resIPSet := types.IPSet{
IPv4: resIPv4,
IPv6: nil,
}
resId := fmt.Sprintf("%s/%s", pod.MainInterface.ENI.ID, resIPSet.String())
resType := types.NetResourceTypeEniIp
pod.PodNetworkMode = types.PodNetworkModeENIShare
if !pod.IsMainInterfaceSharedMode {
resType = types.NetResourceTypeEni
resId = pod.MainInterface.ENI.ID
pod.PodNetworkMode = types.PodNetworkModeENIExclusive
}
pod.Resources = append(pod.Resources, types.VPCResource{
Type: resType,
ID: resId,
ENIId: pod.MainInterface.ENI.ID,
ENIMac: pod.MainInterface.ENI.Mac,
IPv4: resIPv4.String(),
})
}
resId := fmt.Sprintf("%s/%s", pod.MainInterface.ENI.ID, resIPSet.String())
resType := types.NetResourceTypeEniIp
pod.PodNetworkMode = types.PodNetworkModeENIShare
if !pod.IsMainInterfaceSharedMode {
resType = types.NetResourceTypeEni
resId = pod.MainInterface.ENI.ID
pod.PodNetworkMode = types.PodNetworkModeENIExclusive
if pod.NetNs == "" {
podIP, _, err := net.ParseCIDR(pod.MainInterface.IPv4Addr)
if err != nil {
return fmt.Errorf("parse ipv4Addr %s failed, %v", pod.MainInterface.IPv4Addr, err)
}
if nsPath, exist := ipNsMapping[podIP.String()]; exist {
pod.NetNs = nsPath
}
}
pod.Resources = append(pod.Resources, types.VPCResource{
Type: resType,
ID: resId,
ENIId: pod.MainInterface.ENI.ID,
ENIMac: pod.MainInterface.ENI.Mac,
IPv4: resIPv4.String(),
})

err = podPersist.Put(pod)
if err != nil {
return fmt.Errorf("put pod %v to persistence db failed, %v", pod, err)
}
}

podPersist.Close()
log.Infof("Convert pod format in persistence db success")
return nil
}

func GetPodIPs(nsPath string) ([]string, error) {
var ips []string
nsHandle, err := ns.GetNS(nsPath)
if err != nil {
log.Error("get ns handle of %s failed, %v", nsPath, err)
return nil, err
}
defer func() {
err = nsHandle.Close()
if err != nil {
log.Error("close netns %s failed, %v", nsPath, err)
}
}()

err = nsHandle.Do(func(netNS ns.NetNS) error {
links, err := netlink.LinkList()
if err != nil {
return fmt.Errorf("list link failed, %v", err)
}
for _, link := range links {
addrs, err := netlink.AddrList(link, netlink.FAMILY_ALL)
if err != nil {
log.Error("list addr of %s failed, %v", link.Attrs().Name, err)
continue
}
for _, addr := range addrs {
if addr.IPNet != nil {
ips = append(ips, addr.IPNet.IP.String())
}
}
}
return nil
})
return ips, nil
}
18 changes: 18 additions & 0 deletions pkg/utils/netns/netns.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package netns

import (
"fmt"
"os"

"github.com/vishvananda/netns"
)

const nsRootPath = "/var/run/netns"

// CheckNetNsExist check if netns exists,
// if there is an error, return false and error.
func CheckNetNsExist(nsPath string) (bool, error) {
Expand All @@ -36,3 +39,18 @@ func CheckNetNsExist(nsPath string) (bool, error) {
}(&h)
return true, nil
}

func ListAllNetNs() ([]string, error) {
var namespaces []string
files, err := os.ReadDir(nsRootPath)
if err != nil {
return namespaces, fmt.Errorf("read dir %s failed: %v", nsRootPath, err)
}
for _, f := range files {
if f.IsDir() {
continue
}
namespaces = append(namespaces, fmt.Sprintf("%s/%s", nsRootPath, f.Name()))
}
return namespaces, nil
}

0 comments on commit f1c0e59

Please sign in to comment.