Skip to content

Commit

Permalink
Merge pull request #75 from TheBeatles1994/bugfix/fix-update-nls-failed
Browse files Browse the repository at this point in the history
adjust the use order of locks in the code
  • Loading branch information
TheBeatles1994 authored Nov 26, 2021
2 parents d66a5e2 + 876b02c commit dfb46e3
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 46 deletions.
1 change: 1 addition & 0 deletions pkg/agent/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (d *Discoverer) Discover() {
}

// only update status
log.Infof("update nls %s", nlsCopy.Name)
_, err = d.localclientset.CsiV1alpha1().NodeLocalStorages().UpdateStatus(context.Background(), nlsCopy, metav1.UpdateOptions{})
if err != nil {
log.Errorf("local storage CRD updateStatus error: %s", err.Error())
Expand Down
3 changes: 0 additions & 3 deletions pkg/agent/discovery/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (d *Discoverer) ExpandSnapshotLVIfNeeded() {
log.Infof("[ExpandSnapshotLVIfNeeded]expand snapshot lv %s successfully", lv.Name())
}
}

// force update status of nls
d.Discover()
}

func getSnapshotInitialInfo(param map[string]string) (initialSize uint64, threshold float64, increaseSize uint64) {
Expand Down
54 changes: 24 additions & 30 deletions pkg/scheduler/server/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ const (
)

func (e *ExtenderServer) onNodeLocalStorageAdd(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

local, ok := obj.(*nodelocalstorage.NodeLocalStorage)
if !ok {
log.Errorf("cannot convert to *NodeLocalStorage: %v", obj)
Expand All @@ -53,6 +50,8 @@ func (e *ExtenderServer) onNodeLocalStorageAdd(obj interface{}) {

trace.Step("Computing add node cache")
nodeName := local.Name
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
if v := e.Ctx.ClusterNodeCache.GetNodeCache(nodeName); v == nil {
// Create a new node cache
log.Debugf("[onNodeLocalStorageAdd]Created new node cache for %s", nodeName)
Expand All @@ -64,8 +63,6 @@ func (e *ExtenderServer) onNodeLocalStorageAdd(obj interface{}) {
}

func (e *ExtenderServer) onNodeLocalStorageUpdate(oldObj, newObj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
local, ok := newObj.(*nodelocalstorage.NodeLocalStorage)
if !ok {
log.Errorf("cannot convert newObj to *NodeLocalStorage: %v", newObj)
Expand All @@ -79,8 +76,13 @@ func (e *ExtenderServer) onNodeLocalStorageUpdate(oldObj, newObj interface{}) {
log.Debugf("get update on node local cache %s", local.Name)

// trigger an status update according to spec
e.syncer.OnUpdateInitialized(local)
if err := e.syncer.OnUpdateInitialized(local); err != nil {
log.Errorf("[onNodeLocalStorageUpdate]update status of nls %s failed: %s", local.Name, err.Error())
return
}

e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
nodeName := local.Name
if v := e.Ctx.ClusterNodeCache.GetNodeCache(nodeName); v == nil {
// Create a new node cache
Expand All @@ -96,9 +98,6 @@ func (e *ExtenderServer) onNodeLocalStorageUpdate(oldObj, newObj interface{}) {
}

func (e *ExtenderServer) onPVAdd(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
log.Errorf("cannot convert to *v1.PersistentVolume: %v", obj)
Expand All @@ -120,6 +119,8 @@ func (e *ExtenderServer) onPVAdd(obj interface{}) {
log.Infof("pv %s is in %s status, skipped", pv.Name, pv.Status.Phase)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
// get node name
node := e.Ctx.ClusterNodeCache.GetNodeNameFromPV(pv)
if node == "" {
Expand Down Expand Up @@ -178,9 +179,6 @@ func (e *ExtenderServer) onPVAdd(obj interface{}) {
}

func (e *ExtenderServer) onPVDelete(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

var pv *corev1.PersistentVolume
switch t := obj.(type) {
case *corev1.PersistentVolume:
Expand Down Expand Up @@ -209,6 +207,8 @@ func (e *ExtenderServer) onPVDelete(obj interface{}) {
return
}

e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
nc := e.Ctx.ClusterNodeCache.GetNodeCache(node)
if nc == nil {
log.Warningf("no node cache %s found", node)
Expand Down Expand Up @@ -248,9 +248,6 @@ func (e *ExtenderServer) onPVDelete(obj interface{}) {
e.Ctx.ClusterNodeCache.SetNodeCache(nc)
}
func (e *ExtenderServer) onPVUpdate(oldObj, newObj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

var old *corev1.PersistentVolume
var pv *corev1.PersistentVolume
switch t := newObj.(type) {
Expand All @@ -271,6 +268,8 @@ func (e *ExtenderServer) onPVUpdate(oldObj, newObj interface{}) {
log.Infof("pv %s is not a local pv, skipped", pv.Name)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
nc := e.Ctx.ClusterNodeCache.GetNodeCache(node)
if nc == nil {
log.Warningf("no node cache %s found", node)
Expand Down Expand Up @@ -310,9 +309,6 @@ func (e *ExtenderServer) onPVUpdate(oldObj, newObj interface{}) {
}

func (e *ExtenderServer) onPodAdd(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

pod, ok := obj.(*corev1.Pod)
if !ok {
log.Errorf("cannot convert to *v1.Pod: %v", obj)
Expand All @@ -330,14 +326,12 @@ func (e *ExtenderServer) onPodAdd(obj interface{}) {
log.Infof("no open-local pvc found for %s", podName)
return
}

e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
e.Ctx.ClusterNodeCache.PvcMapping.PutPod(podName, pvcs)
}

func (e *ExtenderServer) onPodDelete(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

var pod *corev1.Pod
switch t := obj.(type) {
case *corev1.Pod:
Expand All @@ -364,6 +358,8 @@ func (e *ExtenderServer) onPodDelete(obj interface{}) {
log.Infof("no open-local pvc found for %s", podName)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
e.Ctx.ClusterNodeCache.PvcMapping.DeletePod(podName, pvcs)
}

Expand Down Expand Up @@ -434,20 +430,17 @@ func (e *ExtenderServer) onPodUpdate(_, newObj interface{}) {
}

func (e *ExtenderServer) onPvcAdd(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
pvc, ok := obj.(*corev1.PersistentVolumeClaim)
if !ok {
log.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
e.Ctx.ClusterNodeCache.PvcMapping.PutPvc(pvc)
}

func (e *ExtenderServer) onPvcDelete(obj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

var pvc *corev1.PersistentVolumeClaim
switch t := obj.(type) {
case *corev1.PersistentVolumeClaim:
Expand All @@ -463,14 +456,13 @@ func (e *ExtenderServer) onPvcDelete(obj interface{}) {
log.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
e.Ctx.ClusterNodeCache.PvcMapping.DeletePvc(pvc)

}

func (e *ExtenderServer) onPvcUpdate(_, newObj interface{}) {
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()

//var old *corev1.PersistentVolumeClaim
var pvc *corev1.PersistentVolumeClaim
switch t := newObj.(type) {
Expand All @@ -481,5 +473,7 @@ func (e *ExtenderServer) onPvcUpdate(_, newObj interface{}) {
log.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t)
return
}
e.Ctx.CtxLock.Lock()
defer e.Ctx.CtxLock.Unlock()
e.Ctx.ClusterNodeCache.PvcMapping.PutPvc(pvc)
}
35 changes: 22 additions & 13 deletions pkg/scheduler/statussyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ func NewStatusSyncer(recorder record.EventRecorder, c clientset.Interface, ctx *
}
}

func (syncer *StatusSyncer) OnUpdateInitialized(nls *nodelocalstorage.NodeLocalStorage) {
if need, err := syncer.isUpdateNeeded(nls); !need {
func (syncer *StatusSyncer) OnUpdateInitialized(nls *nodelocalstorage.NodeLocalStorage) error {
need, err := syncer.isUpdateNeeded(nls)
if err != nil {
return fmt.Errorf("[OnUpdateInitialized]check nls %s need update failed: %s", nls.Name, err.Error())
}
if !need {
log.Debugf("update status skipped")
if err == nil && nls.Status.FilteredStorageInfo.UpdateStatus.Status == nodelocalstorage.UpdateStatusFailed {
e := syncer.CleanStatus(nls)
if e != nil {
log.Errorf("CleanStatus failed: %s", e)
}
return nil
}
if err == nil && nls.Status.FilteredStorageInfo.UpdateStatus.Status == nodelocalstorage.UpdateStatusFailed {
e := syncer.CleanStatus(nls)
if e != nil {
return fmt.Errorf("[OnUpdateInitialized]CleanStatus failed: %s", e.Error())
}

return
}

nlsCopy := nls.DeepCopy()
Expand All @@ -73,10 +76,14 @@ func (syncer *StatusSyncer) OnUpdateInitialized(nls *nodelocalstorage.NodeLocalS
nlsCopy.Status.FilteredStorageInfo.UpdateStatus.Reason = ""

// only update status
_, err := syncer.client.CsiV1alpha1().NodeLocalStorages().UpdateStatus(context.Background(), nlsCopy, metav1.UpdateOptions{})
log.Infof("[OnUpdateInitialized]update nls %s", nlsCopy.Name)
_, err = syncer.client.CsiV1alpha1().NodeLocalStorages().UpdateStatus(context.Background(), nlsCopy, metav1.UpdateOptions{})
if err != nil {
log.Errorf("local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
log.Debugf("[OnUpdateInitialized]nls %s resourceVersion: %v", nlsCopy.Name, nlsCopy.ResourceVersion)
return fmt.Errorf("[OnUpdateInitialized]local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
}

return nil
}

// isUpdateNeeded will check whether .status.filteredStorageInfo of nls need update
Expand Down Expand Up @@ -181,9 +188,10 @@ func (syncer *StatusSyncer) UpdateFailedStatus(nls *nodelocalstorage.NodeLocalSt
nlsCopy.Status.FilteredStorageInfo.UpdateStatus.Reason = reason
nlsCopy.Status.FilteredStorageInfo.UpdateStatus.Status = nodelocalstorage.UpdateStatusFailed
// only update status
log.Infof("[UpdateFailedStatus]update nls %s", nlsCopy.Name)
_, err := syncer.client.CsiV1alpha1().NodeLocalStorages().UpdateStatus(context.Background(), nlsCopy, metav1.UpdateOptions{})
if err != nil {
log.Errorf("local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
log.Errorf("[UpdateFailedStatus]local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
return err
}
return nil
Expand All @@ -195,9 +203,10 @@ func (syncer *StatusSyncer) CleanStatus(nls *nodelocalstorage.NodeLocalStorage)
nlsCopy.Status.FilteredStorageInfo.UpdateStatus.Reason = ""
nlsCopy.Status.FilteredStorageInfo.UpdateStatus.LastUpdateTime = metav1.Now()
// only update status
log.Infof("[CleanStatus]update nls %s", nlsCopy.Name)
_, err := syncer.client.CsiV1alpha1().NodeLocalStorages().UpdateStatus(context.Background(), nlsCopy, metav1.UpdateOptions{})
if err != nil {
log.Errorf("local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
log.Errorf("[CleanStatus]local storage CRD update Status FilteredStorageInfo error: %s", err.Error())
return err
}
return nil
Expand Down

0 comments on commit dfb46e3

Please sign in to comment.