Skip to content

Commit

Permalink
checking the version when update a resource with grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 27, 2024
1 parent b7bddd5 commit 8164c92
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 51 deletions.
9 changes: 6 additions & 3 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
// keep the existing version for bundle resource, mainly from hub controller,
// the version is not guaranteed to be increased.
res.Version = found.Version

if res.Version == 0 {
// the resource version is not guaranteed to be increased by source client,
// using the latest resource version.
res.Version = found.Version
}
}
_, err := svr.resourceService.Update(ctx, res)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"log"
"net/http"
"time"

"fmt"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"

Expand Down Expand Up @@ -92,7 +90,7 @@ func main() {

newWork := work.DeepCopy()
newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)}
patchData, err := ToWorkPatch(work, newWork)
patchData, err := grpcsource.ToWorkPatch(work, newWork)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -132,25 +130,6 @@ func NewManifestWork(name string) *workv1.ManifestWork {
}
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}

newData, err := json.Marshal(new)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func NewManifest(name string) workv1.Manifest {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
73 changes: 73 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)

Expand All @@ -32,6 +34,16 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
}
work.ObjectMeta = objectMeta

// use the maestro resource version as the work resource version
work.ObjectMeta.ResourceVersion = fmt.Sprintf("%d", *rb.Version)
if work.ObjectMeta.Annotations == nil {
work.ObjectMeta.Annotations = map[string]string{
common.CloudEventsResourceVersionAnnotationKey: fmt.Sprintf("%d", *rb.Version),
}
} else {
work.ObjectMeta.Annotations[common.CloudEventsResourceVersionAnnotationKey] = fmt.Sprintf("%d", *rb.Version)
}

// get spec from resource
manifests := []workv1.Manifest{}
for _, manifest := range rb.Manifests {
Expand Down Expand Up @@ -170,6 +182,67 @@ func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, erro
return labelSelector, strings.Join(labelSearch, " and "), true, nil
}

// ToWorkPatch returns a merge patch between an existing work and a new work.
// The patch will keep the resource version of an existing work, and only patch a work of
// labels, annotations, finalizers, owner references and spec.
func ToWorkPatch(existingWork, newWork *workv1.ManifestWork) ([]byte, error) {
existingWork = existingWork.DeepCopy()

resourceVersion, ok := existingWork.Annotations[common.CloudEventsResourceVersionAnnotationKey]
if !ok {
return nil, fmt.Errorf("the existing work resource version is not found")
}

if resourceVersion == "0" {
return nil, fmt.Errorf("the existing work resource version cannot be zero")
}

// the resource version is maintained by maestro server, so using the existing resource version
delete(existingWork.Annotations, common.CloudEventsResourceVersionAnnotationKey)

newAnnotations := newWork.Annotations
if newAnnotations == nil {
newAnnotations = map[string]string{
common.CloudEventsResourceVersionAnnotationKey: resourceVersion,
}
} else {
newAnnotations[common.CloudEventsResourceVersionAnnotationKey] = resourceVersion
}

oldData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Labels: existingWork.Labels,
Annotations: existingWork.Annotations,
Finalizers: existingWork.Finalizers,
OwnerReferences: existingWork.OwnerReferences,
},
Spec: existingWork.Spec,
})
if err != nil {
return nil, err
}

newData, err := json.Marshal(&workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Labels: newWork.Labels,
Annotations: newAnnotations,
Finalizers: newWork.Finalizers,
OwnerReferences: newWork.OwnerReferences,
},
Spec: newWork.Spec,
})
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}

return patchBytes, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
98 changes: 94 additions & 4 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package grpcsource

import (
"encoding/json"
"testing"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
)

func TestToManifestWork(t *testing.T) {
var version int32 = 1

workload, err := marshal(map[string]interface{}{"a": "b"})
if err != nil {
t.Fatal(err)
Expand All @@ -30,6 +35,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -39,8 +45,12 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
Annotations: map[string]string{
common.CloudEventsResourceVersionAnnotationKey: "1",
},
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand All @@ -62,6 +72,7 @@ func TestToManifestWork(t *testing.T) {
"name": "test",
"namespace": "testns",
},
Version: &version,
Manifests: []map[string]interface{}{
{"a": "b"},
},
Expand All @@ -85,8 +96,12 @@ func TestToManifestWork(t *testing.T) {
},
expected: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "testns",
Name: "test",
Namespace: "testns",
ResourceVersion: "1",
Annotations: map[string]string{
common.CloudEventsResourceVersionAnnotationKey: "1",
},
},
Spec: workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand Down Expand Up @@ -206,3 +221,78 @@ func TestToLabelSearch(t *testing.T) {
})
}
}

func TestToWorkPatch(t *testing.T) {
cases := []struct {
name string
existingWork *workv1.ManifestWork
newWork *workv1.ManifestWork
expectedVersion string
expectedError bool
}{
{
name: "no resourceVersion",
existingWork: &workv1.ManifestWork{},
expectedError: true,
},
{
name: "resourceVersion is zero",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{
common.CloudEventsResourceVersionAnnotationKey: "0",
},
},
},
expectedError: true,
},
{
name: "should use existing resource version",
existingWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{
common.CloudEventsResourceVersionAnnotationKey: "1",
},
},
},
newWork: &workv1.ManifestWork{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{
common.CloudEventsResourceVersionAnnotationKey: "2",
},
},
},
expectedVersion: "1",
expectedError: false,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
jsonData, err := ToWorkPatch(c.existingWork, c.newWork)
if c.expectedError {
if err == nil {
t.Errorf("expected error, but failed")
}
return
}

if err != nil {
t.Errorf("unexpected error %v", err)
}

metadata := map[string]any{}
if err := json.Unmarshal(jsonData, &metadata); err != nil {
t.Fatal(err)
}

obj := unstructured.Unstructured{
Object: metadata,
}
version := obj.GetAnnotations()[common.CloudEventsResourceVersionAnnotationKey]
if version != c.expectedVersion {
t.Errorf("expected %s, but got %s", c.expectedVersion, version)
}
})
}
}
Loading

0 comments on commit 8164c92

Please sign in to comment.