Skip to content

Commit

Permalink
cr-syncer: Preserve observedGeneration semantics (#124)
Browse files Browse the repository at this point in the history
The status's observedGeneration field lets a CR creator identify when a
controller has updated the status to reflect a change in the spec.
However, because it's tied to the generation field, which is managed by
the apiserver, it can't be directly copied between two clusters.
Instead, we can preserve the difference between generation and
observedGeneration, allowing the CR creator to wait until
generation==observedGeneration before looking at the status.

I also needed to bump to Go 1.18 to use `any`.
  • Loading branch information
drigz authored Mar 29, 2023
1 parent 1c2898b commit c4d42a2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/googlecloudrobotics/core/src

go 1.17
go 1.18

require (
cloud.google.com/go v0.105.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions src/go/cmd/cr-syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
embed = [":go_default_library"],
visibility = ["//visibility:private"],
deps = [
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_onsi_gomega//:go_default_library",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:go_default_library",
"@io_k8s_apiextensions_apiserver//pkg/client/clientset/clientset/fake:go_default_library",
Expand Down
23 changes: 22 additions & 1 deletion src/go/cmd/cr-syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (s *crSyncer) syncDownstream(key string) error {

// Copy full status or subtree from src to dst.
if s.subtree == "" {
dst.Object["status"] = src.Object["status"]
copyStatus(dst, src)
} else if src.Object["status"] != nil {
srcStatus, ok := src.Object["status"].(map[string]interface{})
if !ok {
Expand Down Expand Up @@ -648,3 +648,24 @@ func deleteAnnotation(o *unstructured.Unstructured, key string) {
}

}

func copyStatus(dst, src *unstructured.Unstructured) {
dst.Object["status"] = src.DeepCopy().Object["status"]
// If this CR uses the observedGeneration convention, ensure that we
// preserve the **equality** between generation and observedGeneration,
// since the generations themselves will differ between local and remote.
srcStatus, ok := src.Object["status"].(map[string]interface{})
if !ok {
// Status is not a dict => no observedGeneration.
return
}
dstStatus := dst.Object["status"].(map[string]interface{})
if srcOG, ok := srcStatus["observedGeneration"].(int64); ok {
if src.GetGeneration() == srcOG {
dstStatus["observedGeneration"] = dst.GetGeneration()
} else {
// The controller of this CR has not observed the latest generation.
dstStatus["observedGeneration"] = 0
}
}
}
77 changes: 44 additions & 33 deletions src/go/cmd/cr-syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
crdtypes "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -124,40 +125,11 @@ func (f *fixture) verifyWriteActions() {
localWrites = filterReadActions(f.local.Actions())
remoteWrites = filterReadActions(f.remote.Actions())
)
if !reflect.DeepEqual(localWrites, f.localActions) {
f.Errorf("local writes did not match")
f.Logf("received:")
for i, a := range localWrites {
f.Logf("%d: %s", i, sprintAction(a))
}
f.Logf("expected:")
for i, a := range f.localActions {
f.Logf("%d: %s", i, sprintAction(a))
}
}
if !reflect.DeepEqual(remoteWrites, f.remoteActions) {
f.Errorf("remote writes did not match")
f.Logf("received:")
for i, a := range remoteWrites {
f.Logf("%d: %s", i, sprintAction(a))
}
f.Logf("expected:")
for i, a := range f.remoteActions {
f.Logf("%d: %s", i, sprintAction(a))
}
if diff := cmp.Diff(localWrites, f.localActions); diff != "" {
f.Errorf("local writes did not match (-want +got):\n%s", diff)
}
}

func sprintAction(a k8stest.Action) string {
switch v := a.(type) {
case k8stest.DeleteActionImpl:
return fmt.Sprintf("DELETE %s/%s %s/%s", v.Resource, v.Subresource, v.Namespace, v.Name)
case k8stest.CreateActionImpl:
return fmt.Sprintf("CREATE %s/%s %s/%s: %v", v.Resource, v.Subresource, v.Namespace, v.Name, v.Object.(*unstructured.Unstructured))
case k8stest.UpdateActionImpl:
return fmt.Sprintf("UPDATE %s/%s %s: %v", v.Resource, v.Subresource, v.Namespace, v.Object.(*unstructured.Unstructured))
default:
return fmt.Sprintf("<UNKNOWN ACTION %T>", a)
if diff := cmp.Diff(remoteWrites, f.remoteActions); diff != "" {
f.Errorf("remote writes did not match (-want +got):\n%s", diff)
}
}

Expand Down Expand Up @@ -376,6 +348,45 @@ func TestSyncDownstream_statusFull(t *testing.T) {
f.verifyWriteActions()
}

func TestSyncDownstream_statusWithObservedGeneration(t *testing.T) {
crd := testCRD(crdtypes.NamespaceScoped)
f := newFixture(t)

// If (and only if) generation==observedGeneration for the local resource,
// the cr-syncer should adjust observedGeneration to match for the remote
// resource:
// - local: generation = 3, observedGeneration = 3
// - remote (before test): generation = 2, observedGeneration = 1
// - remote (after test): generation = 2, observedGeneration = 2
tcrLocal := newTestCR("resource1", "spec1", map[string]any{"observedGeneration": int64(3)})
tcrRemote := newTestCR("resource1", "spec1", map[string]any{"observedGeneration": int64(1)})
tcrLocal.SetResourceVersion("123")
tcrLocal.SetGeneration(3)
tcrRemote.SetGeneration(2)

f.addLocalObjects(tcrLocal)
f.addRemoteObjects(tcrRemote)

crs, gvr := f.newCRSyncer(crd, "")
defer crs.stop()

crs.startInformers()
if err := crs.syncDownstream("default/resource1"); err != nil {
t.Fatal(err)
}

// Expect that tcrRemoteNew's observedGeneration is changed to match its
// generation.
tcrRemoteNew := newTestCR("resource1", "spec1", map[string]any{"observedGeneration": int64(2)})
tcrRemoteNew.SetGeneration(2)
tcrRemoteNew.SetAnnotations(map[string]string{
annotationResourceVersion: "123",
})

f.expectRemoteActions(k8stest.NewUpdateAction(gvr, "default", tcrRemoteNew))
f.verifyWriteActions()
}

func TestSyncDownstream_statusSubtree(t *testing.T) {
crd := testCRD(crdtypes.NamespaceScoped)
f := newFixture(t)
Expand Down
9 changes: 2 additions & 7 deletions src/go/cmd/token-vendor/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "go_default_library",
srcs = [
"main.go",
],
srcs = ["main.go"],
importpath = "github.com/googlecloudrobotics/core/src/go/cmd/token-vendor",
visibility = ["//visibility:private"],
deps = [
Expand All @@ -17,13 +15,10 @@ go_library(
"//src/go/cmd/token-vendor/repository/k8s:go_default_library",
"//src/go/cmd/token-vendor/repository/memory:go_default_library",
"//src/go/cmd/token-vendor/tokensource:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//kubernetes:go_default_library",
"@io_k8s_client_go//plugin/pkg/client/auth:go_default_library",
"@io_k8s_client_go//rest:go_default_library",
"@io_k8s_client_go//tools/clientcmd:go_default_library",
"@io_k8s_client_go//util/homedir:go_default_library",
],
)

Expand Down

0 comments on commit c4d42a2

Please sign in to comment.