Skip to content

Commit

Permalink
checking the version when update a resource with grpc (#148)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored Jul 1, 2024
1 parent 7732d3f commit 28d727f
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 52 deletions.
9 changes: 6 additions & 3 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
// keep the existing version for bundle resource, mainly from hub controller,
// the version is not guaranteed to be increased.
res.Version = found.Version

if res.Version == 0 {
// the resource version is not guaranteed to be increased by source client,
// using the latest resource version.
res.Version = found.Version
}
}
_, err := svr.resourceService.Update(ctx, res)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"log"
"net/http"
"time"

"fmt"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"

Expand Down Expand Up @@ -92,7 +90,7 @@ func main() {

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := ToWorkPatch(work, newWork)
patchData, err := grpcsource.ToWorkPatch(work, newWork)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -132,25 +130,6 @@ func NewManifestWork(name string) *workv1.ManifestWork {
}
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func NewManifest(name string) workv1.Manifest {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 h1:HOvmoJ1CZF26EDkf4t53ZYztaSjM1LBFk1JuHTIffHU=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb h1:mtEsdkNt92YmMR3jrAV7gRffSSfEsN091DPErNmMGJU=
open-cluster-management.io/sdk-go v0.14.1-0.20240628024040-cee4f104b1eb/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
53 changes: 53 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -31,6 +32,8 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
return nil, err
}
work.ObjectMeta = objectMeta
// use the maestro resource version as the work resource version
work.ObjectMeta.ResourceVersion = fmt.Sprintf("%d", *rb.Version)

// get spec from resource
manifests := []workv1.Manifest{}
Expand Down Expand Up @@ -170,6 +173,56 @@ func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, erro
return labelSelector, strings.Join(labelSearch, " and "), true, nil
}

// ToWorkPatch returns a merge patch between an existing work and a new work.
// The patch will keep the resource version of an existing work, and only patch a work of
// labels, annotations, finalizers, owner references and spec.
func ToWorkPatch(existingWork, newWork *workv1.ManifestWork) ([]byte, error) {
existingWork = existingWork.DeepCopy()

if existingWork.ResourceVersion == "" {
return nil, fmt.Errorf("the existing work resource version is not found")
}

if existingWork.ResourceVersion == "0" {
return nil, fmt.Errorf("the existing work resource version cannot be zero")
}

oldData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Labels: existingWork.Labels,
Annotations: existingWork.Annotations,
Finalizers: existingWork.Finalizers,
OwnerReferences: existingWork.OwnerReferences,
},
Spec: existingWork.Spec,
})
if err != nil {
return nil, err
}

newData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: existingWork.UID,
ResourceVersion: existingWork.ResourceVersion,
Labels: newWork.Labels,
Annotations: newWork.Annotations,
Finalizers: newWork.Finalizers,
OwnerReferences: newWork.OwnerReferences,
},
Spec: newWork.Spec,
})
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
85 changes: 81 additions & 4 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package grpcsource

import (
"encoding/json"
"testing"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
)

func TestToManifestWork(t *testing.T) {
var version int32 = 1

workload, err := marshal(map[string]interface{}{"a": "b"})
if err != nil {
t.Fatal(err)
Expand All @@ -30,6 +34,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -39,8 +44,9 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand All @@ -62,6 +68,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -85,8 +92,9 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand Down Expand Up @@ -206,3 +214,72 @@ func TestToLabelSearch(t *testing.T) {
})
}
}

func TestToWorkPatch(t *testing.T) {
cases := []struct {
name string
existingWork *workv1.ManifestWork
newWork *workv1.ManifestWork
expectedVersion string
expectedError bool
}{
{
name: "no resourceVersion",
existingWork: &workv1.ManifestWork{},
expectedError: true,
},
{
name: "resourceVersion is zero",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "0",
},
},
expectedError: true,
},
{
name: "should use existing resource version",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "1",
},
},
newWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
ResourceVersion: "2",
},
},
expectedVersion: "1",
expectedError: false,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
jsonData, err := ToWorkPatch(c.existingWork, c.newWork)
if c.expectedError {
if err == nil {
t.Errorf("expected error, but failed")
}
return
}

if err != nil {
t.Errorf("unexpected error %v", err)
}

metadata := map[string]any{}
if err := json.Unmarshal(jsonData, &metadata); err != nil {
t.Fatal(err)
}

obj := unstructured.Unstructured{
Object: metadata,
}
version := obj.GetResourceVersion()
if version != c.expectedVersion {
t.Errorf("expected %s, but got %s", c.expectedVersion, version)
}
})
}
}
72 changes: 50 additions & 22 deletions test/e2e/pkg/sourceclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package e2e_test

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

jsonpatch "github.com/evanphx/json-patch"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"
Expand All @@ -28,6 +26,54 @@ import (
)

var _ = Describe("gRPC Source ManifestWork Client Test", func() {
Context("Update an obsolete work", func() {
var workName string

BeforeEach(func() {
workName = "work-" + rand.String(5)
work := NewManifestWork(workName)
_, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

// wait for few seconds to ensure the creation is finished
<-time.After(5 * time.Second)
})

AfterEach(func() {
err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())

Eventually(func() error {
return AssertWorkNotFound(workName)
}, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred())

})

It("Should return an error when updating an obsolete work", func() {
By("update a work by work client")
work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := grpcsource.ToWorkPatch(work, newWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).ShouldNot(HaveOccurred())

By("update the work by work client again")
obsoleteWork := work.DeepCopy()
obsoleteWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err = grpcsource.ToWorkPatch(work, obsoleteWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).Should(HaveOccurred())
Expect(strings.Contains(err.Error(), "the resource version is not the latest")).Should(BeTrue())
})
})

Context("Watch work status with gRPC source ManifestWork client", func() {
var watcherCtx context.Context
var watcherCancel context.CancelFunc
Expand Down Expand Up @@ -98,8 +144,9 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() {

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := ToWorkPatch(work, newWork)
patchData, err := grpcsource.ToWorkPatch(work, newWork)
Expect(err).ShouldNot(HaveOccurred())

_, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{})
Expect(err).ShouldNot(HaveOccurred())

Expand Down Expand Up @@ -473,25 +520,6 @@ func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.Ma
return work
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func NewManifest(name string) workv1.Manifest {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down

0 comments on commit 28d727f

Please sign in to comment.