Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checking the version when update a resource with grpc #148

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
// keep the existing version for bundle resource, mainly from hub controller,
// the version is not guaranteed to be increased.
res.Version = found.Version

if res.Version == 0 {
// the resource version is not guaranteed to be increased by source client,
// using the latest resource version.
res.Version = found.Version
}
}
_, err := svr.resourceService.Update(ctx, res)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"log"
"net/http"
"time"

"fmt"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"

Expand Down Expand Up @@ -92,7 +90,7 @@ func main() {

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := ToWorkPatch(work, newWork)
patchData, err := grpcsource.ToWorkPatch(work, newWork)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -132,25 +130,6 @@ func NewManifestWork(name string) *workv1.ManifestWork {
}
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func NewManifest(name string) workv1.Manifest {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 h1:HOvmoJ1CZF26EDkf4t53ZYztaSjM1LBFk1JuHTIffHU=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb h1:mtEsdkNt92YmMR3jrAV7gRffSSfEsN091DPErNmMGJU=
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
53 changes: 53 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -31,6 +32,8 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
return nil, err
}
work.ObjectMeta = objectMeta
// use the maestro resource version as the work resource version
work.ObjectMeta.ResourceVersion = fmt.Sprintf("%d", *rb.Version)

// get spec from resource
manifests := []workv1.Manifest{}
Expand Down Expand Up @@ -170,6 +173,56 @@ func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, erro
return labelSelector, strings.Join(labelSearch, " and "), true, nil
}

// ToWorkPatch returns a merge patch between an existing work and a new work.
// The patch will keep the resource version of an existing work, and only patch a work of
// labels, annotations, finalizers, owner references and spec.
func ToWorkPatch(existingWork, newWork *workv1.ManifestWork) ([]byte, error) {
existingWork = existingWork.DeepCopy()

if existingWork.ResourceVersion == "" {
return nil, fmt.Errorf("the existing work resource version is not found")
}

if existingWork.ResourceVersion == "0" {
return nil, fmt.Errorf("the existing work resource version cannot be zero")
}

oldData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Labels: existingWork.Labels,
Annotations: existingWork.Annotations,
Finalizers: existingWork.Finalizers,
OwnerReferences: existingWork.OwnerReferences,
},
Spec: existingWork.Spec,
})
if err != nil {
return nil, err
}

newData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: existingWork.UID,
ResourceVersion: existingWork.ResourceVersion,
Labels: newWork.Labels,
Annotations: newWork.Annotations,
Finalizers: newWork.Finalizers,
OwnerReferences: newWork.OwnerReferences,
},
Spec: newWork.Spec,
})
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
85 changes: 81 additions & 4 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package grpcsource

import (
"encoding/json"
"testing"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
)

func TestToManifestWork(t *testing.T) {
var version int32 = 1

workload, err := marshal(map[string]interface{}{"a": "b"})
if err != nil {
t.Fatal(err)
Expand All @@ -30,6 +34,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -39,8 +44,9 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand All @@ -62,6 +68,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -85,8 +92,9 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand Down Expand Up @@ -206,3 +214,72 @@ func TestToLabelSearch(t *testing.T) {
})
}
}

func TestToWorkPatch(t *testing.T) {
cases := []struct {
name string
existingWork *workv1.ManifestWork
newWork *workv1.ManifestWork
expectedVersion string
expectedError bool
}{
{
name: "no resourceVersion",
existingWork: &workv1.ManifestWork{},
expectedError: true,
},
{
name: "resourceVersion is zero",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "0",
},
},
expectedError: true,
},
{
name: "should use existing resource version",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "1",
},
},
newWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "2",
},
},
expectedVersion: "1",
expectedError: false,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
jsonData, err := ToWorkPatch(c.existingWork, c.newWork)
if c.expectedError {
if err == nil {
t.Errorf("expected error, but failed")
}
return
}

if err != nil {
t.Errorf("unexpected error %v", err)
}

metadata := map[string]any{}
if err := json.Unmarshal(jsonData, &metadata); err != nil {
t.Fatal(err)
}

obj := unstructured.Unstructured{
Object: metadata,
}
version := obj.GetResourceVersion()
if version != c.expectedVersion {
t.Errorf("expected %s, but got %s", c.expectedVersion, version)
}
})
}
}
72 changes: 50 additions & 22 deletions test/e2e/pkg/sourceclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package e2e_test

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

jsonpatch "github.com/evanphx/json-patch"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"
Expand All @@ -28,6 +26,54 @@ import (
)

var _ = Describe("gRPC Source ManifestWork Client Test", func() {
Context("Update an obsolete work", func() {
var workName string

BeforeEach(func() {
workName = "work-" + rand.String(5)
work := NewManifestWork(workName)
_, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

// wait for few seconds to ensure the creation is finished
<-time.After(5 * time.Second)
})

AfterEach(func() {
err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())

Eventually(func() error {
return AssertWorkNotFound(workName)
}, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred())

})

It("Should return an error when updating an obsolete work", func() {
By("update a work by work client")
work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := grpcsource.ToWorkPatch(work, newWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).ShouldNot(HaveOccurred())

By("update the work by work client again")
obsoleteWork := work.DeepCopy()
obsoleteWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err = grpcsource.ToWorkPatch(work, obsoleteWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).Should(HaveOccurred())
Expect(strings.Contains(err.Error(), "the resource version is not the latest")).Should(BeTrue())
})
})

Context("Watch work status with gRPC source ManifestWork client", func() {
var watcherCtx context.Context
var watcherCancel context.CancelFunc
Expand Down Expand Up @@ -98,8 +144,9 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() {

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := ToWorkPatch(work, newWork)
patchData, err := grpcsource.ToWorkPatch(work, newWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).ShouldNot(HaveOccurred())

Expand Down Expand Up @@ -473,25 +520,6 @@ func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.Ma
return work
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func NewManifest(name string) workv1.Manifest {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
Loading