diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index d0870add..c6473872 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -138,9 +138,12 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest) if err != nil { return nil, fmt.Errorf("failed to get resource: %v", err) } - // keep the existing version for bundle resource, mainly from hub controller, - // the version is not guaranteed to be increased. - res.Version = found.Version + + if res.Version == 0 { + // the resource version is not guaranteed to be increased by source client, + // using the latest resource version. + res.Version = found.Version + } } _, err := svr.resourceService.Update(ctx, res) if err != nil { diff --git a/examples/manifestworkclient/client-b/main.go b/examples/manifestworkclient/client-b/main.go index 7c925394..527b9ef5 100644 --- a/examples/manifestworkclient/client-b/main.go +++ b/examples/manifestworkclient/client-b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "crypto/tls" - "encoding/json" "flag" "log" "net/http" @@ -11,7 +10,6 @@ import ( "fmt" - jsonpatch "github.com/evanphx/json-patch" "github.com/openshift-online/maestro/pkg/api/openapi" "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" @@ -92,7 +90,7 @@ func main() { newWork := work.DeepCopy() newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} - patchData, err := ToWorkPatch(work, newWork) + patchData, err := grpcsource.ToWorkPatch(work, newWork) if err != nil { log.Fatal(err) } @@ -132,25 +130,6 @@ func NewManifestWork(name string) *workv1.ManifestWork { } } -func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { - oldData, err := json.Marshal(old) - if err != nil { - return nil, err - } - - newData, err := json.Marshal(new) - if err != nil { - return nil, err - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return nil, err - } - - return patchBytes, nil -} - func NewManifest(name string) workv1.Manifest { obj := &unstructured.Unstructured{ Object: map[string]interface{}{ diff --git a/go.mod b/go.mod index 36166454..e6b6c75a 100755 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( k8s.io/klog/v2 v2.120.1 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 - open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 + open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb ) require ( diff --git a/go.sum b/go.sum index 3075cfab..62549243 100755 --- a/go.sum +++ b/go.sum @@ -827,6 +827,8 @@ open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4= open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 h1:HOvmoJ1CZF26EDkf4t53ZYztaSjM1LBFk1JuHTIffHU= open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb h1:mtEsdkNt92YmMR3jrAV7gRffSSfEsN091DPErNmMGJU= +open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/pkg/client/cloudevents/grpcsource/util.go b/pkg/client/cloudevents/grpcsource/util.go index dd3ce696..7831c61b 100644 --- a/pkg/client/cloudevents/grpcsource/util.go +++ b/pkg/client/cloudevents/grpcsource/util.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + jsonpatch "github.com/evanphx/json-patch" "github.com/openshift-online/maestro/pkg/api/openapi" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -31,6 +32,8 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) { return nil, err } work.ObjectMeta = objectMeta + // use the maestro resource version as the work resource version + work.ObjectMeta.ResourceVersion = fmt.Sprintf("%d", *rb.Version) // get spec from resource manifests := []workv1.Manifest{} @@ -170,6 +173,56 @@ func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, erro return labelSelector, strings.Join(labelSearch, " and "), true, nil } +// ToWorkPatch returns a merge patch between an existing work and a new work. +// The patch will keep the resource version of an existing work, and only patch a work of +// labels, annotations, finalizers, owner references and spec. +func ToWorkPatch(existingWork, newWork *workv1.ManifestWork) ([]byte, error) { + existingWork = existingWork.DeepCopy() + + if existingWork.ResourceVersion == "" { + return nil, fmt.Errorf("the existing work resource version is not found") + } + + if existingWork.ResourceVersion == "0" { + return nil, fmt.Errorf("the existing work resource version cannot be zero") + } + + oldData, err := json.Marshal(&workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + Labels: existingWork.Labels, + Annotations: existingWork.Annotations, + Finalizers: existingWork.Finalizers, + OwnerReferences: existingWork.OwnerReferences, + }, + Spec: existingWork.Spec, + }) + if err != nil { + return nil, err + } + + newData, err := json.Marshal(&workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + UID: existingWork.UID, + ResourceVersion: existingWork.ResourceVersion, + Labels: newWork.Labels, + Annotations: newWork.Annotations, + Finalizers: newWork.Finalizers, + OwnerReferences: newWork.OwnerReferences, + }, + Spec: newWork.Spec, + }) + if err != nil { + return nil, err + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, err + } + + return patchBytes, nil +} + func marshal(obj map[string]any) ([]byte, error) { unstructuredObj := unstructured.Unstructured{Object: obj} data, err := unstructuredObj.MarshalJSON() diff --git a/pkg/client/cloudevents/grpcsource/util_test.go b/pkg/client/cloudevents/grpcsource/util_test.go index 7e333d3c..8738d8b3 100644 --- a/pkg/client/cloudevents/grpcsource/util_test.go +++ b/pkg/client/cloudevents/grpcsource/util_test.go @@ -1,11 +1,13 @@ package grpcsource import ( + "encoding/json" "testing" "github.com/openshift-online/maestro/pkg/api/openapi" "k8s.io/apimachinery/pkg/api/equality" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -13,6 +15,8 @@ import ( ) func TestToManifestWork(t *testing.T) { + var version int32 = 1 + workload, err := marshal(map[string]interface{}{"a": "b"}) if err != nil { t.Fatal(err) @@ -30,6 +34,7 @@ func TestToManifestWork(t *testing.T) { "name": "test", "namespace": "testns", }, + Version: &version, Manifests: []map[string]interface{}{ {"a": "b"}, }, @@ -39,8 +44,9 @@ func TestToManifestWork(t *testing.T) { }, expected: &workv1.ManifestWork{ ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "testns", + Name: "test", + Namespace: "testns", + ResourceVersion: "1", }, Spec: workv1.ManifestWorkSpec{ Workload: workv1.ManifestsTemplate{ @@ -62,6 +68,7 @@ func TestToManifestWork(t *testing.T) { "name": "test", "namespace": "testns", }, + Version: &version, Manifests: []map[string]interface{}{ {"a": "b"}, }, @@ -85,8 +92,9 @@ func TestToManifestWork(t *testing.T) { }, expected: &workv1.ManifestWork{ ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "testns", + Name: "test", + Namespace: "testns", + ResourceVersion: "1", }, Spec: workv1.ManifestWorkSpec{ Workload: workv1.ManifestsTemplate{ @@ -206,3 +214,72 @@ func TestToLabelSearch(t *testing.T) { }) } } + +func TestToWorkPatch(t *testing.T) { + cases := []struct { + name string + existingWork *workv1.ManifestWork + newWork *workv1.ManifestWork + expectedVersion string + expectedError bool + }{ + { + name: "no resourceVersion", + existingWork: &workv1.ManifestWork{}, + expectedError: true, + }, + { + name: "resourceVersion is zero", + existingWork: &workv1.ManifestWork{ + ObjectMeta: v1.ObjectMeta{ + ResourceVersion: "0", + }, + }, + expectedError: true, + }, + { + name: "should use existing resource version", + existingWork: &workv1.ManifestWork{ + ObjectMeta: v1.ObjectMeta{ + ResourceVersion: "1", + }, + }, + newWork: &workv1.ManifestWork{ + ObjectMeta: v1.ObjectMeta{ + ResourceVersion: "2", + }, + }, + expectedVersion: "1", + expectedError: false, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + jsonData, err := ToWorkPatch(c.existingWork, c.newWork) + if c.expectedError { + if err == nil { + t.Errorf("expected error, but failed") + } + return + } + + if err != nil { + t.Errorf("unexpected error %v", err) + } + + metadata := map[string]any{} + if err := json.Unmarshal(jsonData, &metadata); err != nil { + t.Fatal(err) + } + + obj := unstructured.Unstructured{ + Object: metadata, + } + version := obj.GetResourceVersion() + if version != c.expectedVersion { + t.Errorf("expected %s, but got %s", c.expectedVersion, version) + } + }) + } +} diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index 855b264d..7e077f4f 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -2,12 +2,10 @@ package e2e_test import ( "context" - "encoding/json" "fmt" "strings" "time" - jsonpatch "github.com/evanphx/json-patch" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" @@ -28,6 +26,54 @@ import ( ) var _ = Describe("gRPC Source ManifestWork Client Test", func() { + Context("Update an obsolete work", func() { + var workName string + + BeforeEach(func() { + workName = "work-" + rand.String(5) + work := NewManifestWork(workName) + _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // wait for few seconds to ensure the creation is finished + <-time.After(5 * time.Second) + }) + + AfterEach(func() { + err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + return AssertWorkNotFound(workName) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + + }) + + It("Should return an error when updating an obsolete work", func() { + By("update a work by work client") + work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + newWork := work.DeepCopy() + newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} + patchData, err := grpcsource.ToWorkPatch(work, newWork) + Expect(err).ShouldNot(HaveOccurred()) + + _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + By("update the work by work client again") + obsoleteWork := work.DeepCopy() + obsoleteWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} + patchData, err = grpcsource.ToWorkPatch(work, obsoleteWork) + Expect(err).ShouldNot(HaveOccurred()) + + _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + Expect(err).Should(HaveOccurred()) + Expect(strings.Contains(err.Error(), "the resource version is not the latest")).Should(BeTrue()) + }) + }) + Context("Watch work status with gRPC source ManifestWork client", func() { var watcherCtx context.Context var watcherCancel context.CancelFunc @@ -98,8 +144,9 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() { newWork := work.DeepCopy() newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} - patchData, err := ToWorkPatch(work, newWork) + patchData, err := grpcsource.ToWorkPatch(work, newWork) Expect(err).ShouldNot(HaveOccurred()) + _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) Expect(err).ShouldNot(HaveOccurred()) @@ -473,25 +520,6 @@ func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.Ma return work } -func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { - oldData, err := json.Marshal(old) - if err != nil { - return nil, err - } - - newData, err := json.Marshal(new) - if err != nil { - return nil, err - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return nil, err - } - - return patchBytes, nil -} - func NewManifest(name string) workv1.Manifest { obj := &unstructured.Unstructured{ Object: map[string]interface{}{