Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updatefix #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 56 additions & 51 deletions go-controller/pkg/ovn/obj_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,46 +1515,51 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) (*factory.Handler
},

UpdateFunc: func(old, newer interface{}) {
// skip the whole update if old and newer are equal
areEqual, err := areResourcesEqual(objectsToRetry.oType, old, newer)
// get object key. API server guarantees that the key in old and newer are the same.
objKey, err := getResourceKey(objectsToRetry.oType, old)
if err != nil {
klog.Errorf("Could not compare old and newer resource objects of type %s: %v",
klog.Errorf("Update of %s failed when looking up key of old obj: %v",
objectsToRetry.oType, err)
return
}
klog.V(5).Infof("Update event received for resource %s, old object is equal to new: %t",
objectsToRetry.oType, areEqual)
if areEqual {
return
}
oc.recordUpdateEvent(objectsToRetry.oType, newer)

// get the object keys for newer and old (expected to be the same)
newKey, err := getResourceKey(objectsToRetry.oType, newer)
// Retrieve latest version of the object and skip the whole update if the object doesn't exist
// (rely on delete event for object removal)
latest, err := oc.getResourceFromInformerCache(objectsToRetry.oType, objKey)
if err != nil {
klog.Errorf("Update of %s failed when looking up key of new obj: %v",
objectsToRetry.oType, err)
// When processing an object in terminal state there is a chance that it was already removed from
// the API server. Since delete events for objects in terminal state are skipped delete it here.
// This only applies to pod watchers (pods + dynamic network policy handlers watching pods).
if kerrors.IsNotFound(err) && oc.isObjectInTerminalState(objectsToRetry.oType, newer) {
klog.Warningf("%s %s is in terminal state but no longer exists in informer cache, removing",
objectsToRetry.oType, objKey)
oc.processObjectInTerminalState(objectsToRetry, newer, objKey, resourceEventUpdate)
} else {
klog.Warningf("Unable to get %s %s from informer cache (perhaps it was already"+
" deleted?), skipping update: %v", objectsToRetry.oType, objKey, err)
}
return
}
oldKey, err := getResourceKey(objectsToRetry.oType, old)
// fast-forward to the latest version of the object
newer = latest
// skip the whole update if old and newer are equal
areEqual, err := areResourcesEqual(objectsToRetry.oType, old, newer)
if err != nil {
klog.Errorf("Update of %s failed when looking up key of old obj: %v",
klog.Errorf("Could not compare old and newer resource objects of type %s: %v",
objectsToRetry.oType, err)
return
}

// skip the whole update if the new object doesn't exist anymore in the API server
newer, err = oc.getResourceFromInformerCache(objectsToRetry.oType, newKey)
if err != nil {
klog.Warningf("Unable to get %s %s from informer cache (perhaps it was already"+
" deleted?), skipping update: %v", objectsToRetry.oType, newKey, err)
klog.V(5).Infof("Update event received for resource %s, old object is equal to new: %t",
objectsToRetry.oType, areEqual)
if areEqual {
return
}
oc.recordUpdateEvent(objectsToRetry.oType, newer)

klog.V(5).Infof("Update event received for %s, oldKey=%s, newKey=%s",
objectsToRetry.oType, oldKey, newKey)
klog.V(5).Infof("Update event received for %s, objKey=%s",
objectsToRetry.oType, objKey)

objectsToRetry.skipRetryObj(newKey)
objectsToRetry.skipRetryObj(objKey)
hasUpdateFunc := hasResourceAnUpdateFunc(objectsToRetry.oType)

// STEP 1:
Expand All @@ -1566,28 +1571,28 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) (*factory.Handler
// Note it is okay to access retryEntry without lock here, as the entry is either
// accessed by iterateRetryResources or in add/update/delete handler of this resource;
// the former is prevented as its ignore field is set to true, and the latter is serialized.
retryEntry := objectsToRetry.getObjRetryEntry(oldKey)
retryEntry := objectsToRetry.getObjRetryEntry(objKey)
if retryEntry != nil && retryEntry.oldObj != nil {
// [step 1a] there is a retry entry marked for deletion
klog.Infof("Found retry entry for %s %s marked for deletion: will delete the object",
objectsToRetry.oType, oldKey)
objectsToRetry.oType, objKey)
if err := oc.deleteResource(objectsToRetry, retryEntry.oldObj,
retryEntry.config); err != nil {
klog.Errorf("Failed to delete stale object %s, during update: %v", oldKey, err)
klog.Errorf("Failed to delete stale object %s, during update: %v", objKey, err)
oc.recordErrorEvent(objectsToRetry.oType, retryEntry.oldObj, err)
objectsToRetry.initRetryObjWithAdd(newer, newKey)
objectsToRetry.unSkipRetryObj(newKey)
objectsToRetry.increaseFailedAttemptsCounter(newKey)
objectsToRetry.initRetryObjWithAdd(newer, objKey)
objectsToRetry.unSkipRetryObj(objKey)
objectsToRetry.increaseFailedAttemptsCounter(objKey)
return
}
// remove the old object from retry entry since it was correctly deleted
objectsToRetry.removeDeleteFromRetryObj(oldKey)
objectsToRetry.removeDeleteFromRetryObj(objKey)

} else if oc.isObjectInTerminalState(objectsToRetry.oType, newer) { // check the latest status on newer
// [step 1b] The object is in a terminal state: delete it from the cluster,
// delete its retry entry and return. This only applies to pod watchers
// (pods + dynamic network policy handlers watching pods).
oc.processObjectInTerminalState(objectsToRetry, newer, newKey, resourceEventUpdate)
oc.processObjectInTerminalState(objectsToRetry, newer, objKey, resourceEventUpdate)
return

} else if !hasUpdateFunc {
Expand All @@ -1597,51 +1602,51 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) (*factory.Handler
if retryEntry != nil {
existingCacheEntry = retryEntry.config
}
klog.Infof("Deleting old %s of type %s during update", oldKey, objectsToRetry.oType)
klog.Infof("Deleting old %s of type %s during update", objKey, objectsToRetry.oType)
if err := oc.deleteResource(objectsToRetry, old, existingCacheEntry); err != nil {
klog.Errorf("Failed to delete %s %s, during update: %v",
objectsToRetry.oType, oldKey, err)
objectsToRetry.oType, objKey, err)
oc.recordErrorEvent(objectsToRetry.oType, old, err)
objectsToRetry.initRetryObjWithDelete(old, oldKey, nil, false)
objectsToRetry.initRetryObjWithAdd(newer, newKey)
objectsToRetry.unSkipRetryObj(oldKey)
objectsToRetry.increaseFailedAttemptsCounter(oldKey)
objectsToRetry.initRetryObjWithDelete(old, objKey, nil, false)
objectsToRetry.initRetryObjWithAdd(newer, objKey)
objectsToRetry.unSkipRetryObj(objKey)
objectsToRetry.increaseFailedAttemptsCounter(objKey)
return
}
// remove the old object from retry entry since it was correctly deleted
objectsToRetry.removeDeleteFromRetryObj(oldKey)
objectsToRetry.removeDeleteFromRetryObj(objKey)
}

// STEP 2:
// Execute the update function for this resource type; resort to add if no update
// function is available.
if hasUpdateFunc {
// if this resource type has an update func, just call the update function
if err := oc.updateResource(objectsToRetry, old, newer, objectsToRetry.checkRetryObj(newKey)); err != nil {
klog.Errorf("Failed to update %s, old=%s, new=%s, error: %v",
objectsToRetry.oType, oldKey, newKey, err)
if err := oc.updateResource(objectsToRetry, old, newer, objectsToRetry.checkRetryObj(objKey)); err != nil {
klog.Errorf("Failed to update %s, key=%s, error: %v",
objectsToRetry.oType, objKey, err)
oc.recordErrorEvent(objectsToRetry.oType, newer, err)
if resourceNeedsUpdate(objectsToRetry.oType) {
objectsToRetry.initRetryObjWithUpdate(old, newer, newKey)
objectsToRetry.initRetryObjWithUpdate(old, newer, objKey)
} else {
objectsToRetry.initRetryObjWithAdd(newer, newKey)
objectsToRetry.initRetryObjWithAdd(newer, objKey)
}
objectsToRetry.unSkipRetryObj(newKey)
objectsToRetry.increaseFailedAttemptsCounter(newKey)
objectsToRetry.unSkipRetryObj(objKey)
objectsToRetry.increaseFailedAttemptsCounter(objKey)
return
}
} else { // we previously deleted old object, now let's add the new one
if err := oc.addResource(objectsToRetry, newer, false); err != nil {
oc.recordErrorEvent(objectsToRetry.oType, newer, err)
objectsToRetry.initRetryObjWithAdd(newer, newKey)
objectsToRetry.unSkipRetryObj(newKey)
objectsToRetry.increaseFailedAttemptsCounter(newKey)
objectsToRetry.initRetryObjWithAdd(newer, objKey)
objectsToRetry.unSkipRetryObj(objKey)
objectsToRetry.increaseFailedAttemptsCounter(objKey)
klog.Errorf("Failed to add %s %s, during update: %v",
objectsToRetry.oType, newKey, err)
objectsToRetry.oType, objKey, err)
return
}
}
objectsToRetry.deleteRetryObj(newKey, true)
objectsToRetry.deleteRetryObj(objKey, true)
oc.recordSuccessEvent(objectsToRetry.oType, newer)

},
Expand Down