Skip to content

Commit

Permalink
remove version check for grpc. (#76)
Browse files Browse the repository at this point in the history
* remove version check for grpc.

Signed-off-by: morvencao <[email protected]>

* start resource version from 1.

Signed-off-by: morvencao <[email protected]>

---------

Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao authored Apr 16, 2024
1 parent 1e08a6a commit 1979f73
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 15 deletions.
8 changes: 3 additions & 5 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,9 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
// handle the special case that the resource is updated by the source controller
// and the version of the resource in the request is less than it in the database
if found.Version < res.Version {
res.Version = found.Version
}
// keep the existing version for bundle resource, mainly from hub controller,
// the version is not guaranteed to be increased.
res.Version = found.Version
}
_, err := svr.resourceService.Update(ctx, res)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (d *Resource) BeforeCreate(tx *gorm.DB) error {
if d.ID == "" {
d.ID = NewID()
}
// start the resource version from 1
if d.Version == 0 {
d.Version = 1
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions test/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (helper *Helper) NewResource(consumerName string, replicas int) *api.Resour
ConsumerName: consumerName,
Type: api.ResourceTypeSingle,
Manifest: testManifest,
Version: 1,
}

return resource
Expand Down
119 changes: 119 additions & 0 deletions test/grpc_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/openshift-online/maestro/pkg/api"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
Expand Down Expand Up @@ -126,3 +128,120 @@ func (c *ResourceCodec) Decode(evt *cloudevents.Event) (*api.Resource, error) {

return resource, nil
}

type ResourceBundleCodec struct{}

var _ generic.Codec[*api.Resource] = &ResourceBundleCodec{}

func (c *ResourceBundleCodec) EventDataType() types.CloudEventsDataType {
return payload.ManifestBundleEventDataType
}

// encode the kubernetes resource to a cloudevent format
func (c *ResourceBundleCodec) Encode(source string, eventType types.CloudEventsType, resource *api.Resource) (*cloudevents.Event, error) {
if eventType.CloudEventsDataType != payload.ManifestBundleEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}

eventBuilder := types.NewEventBuilder(source, eventType).
WithResourceID(resource.ID).
WithResourceVersion(int64(resource.Version)).
WithClusterName(resource.ConsumerName)

if !resource.GetDeletionTimestamp().IsZero() {
evt := eventBuilder.WithDeletionTimestamp(resource.GetDeletionTimestamp().Time).NewEvent()
return &evt, nil
}

manifest, err := api.DecodeManifest(resource.Manifest)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %v", err)
}

evt := eventBuilder.NewEvent()

manifests := &payload.ManifestBundle{
Manifests: []workv1.Manifest{
{
RawExtension: runtime.RawExtension{
Object: &unstructured.Unstructured{Object: manifest},
},
},
},
DeleteOption: &workv1.DeleteOption{
PropagationPolicy: workv1.DeletePropagationPolicyTypeForeground,
},
ManifestConfigs: []workv1.ManifestConfigOption{},
}
if err := evt.SetData(cloudevents.ApplicationJSON, manifests); err != nil {
return nil, fmt.Errorf("failed to encode resource bundle to a cloudevent: %v", err)
}

return &evt, nil
}

func (c *ResourceBundleCodec) Decode(evt *cloudevents.Event) (*api.Resource, error) {
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
}

if eventType.CloudEventsDataType != payload.ManifestBundleEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}

evtExtensions := evt.Context.GetExtensions()

resourceID, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionResourceID])
if err != nil {
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
}

resourceVersion, err := cloudeventstypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
if err != nil {
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
}

clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName])
if err != nil {
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
}

manifestStatus := &payload.ManifestBundleStatus{}
if err := evt.DataAs(manifestStatus); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}

resource := &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Version: resourceVersion,
ConsumerName: clusterName,
}

resourceStatus := &api.ResourceStatus{
ReconcileStatus: &api.ReconcileStatus{
ObservedVersion: resourceVersion,
},
}

if len(manifestStatus.ResourceStatus) > 0 {
resourceStatus.ReconcileStatus.Conditions = manifestStatus.ResourceStatus[0].Conditions
if meta.IsStatusConditionTrue(manifestStatus.Conditions, common.ManifestsDeleted) {
deletedCondition := meta.FindStatusCondition(manifestStatus.Conditions, common.ManifestsDeleted)
resourceStatus.ReconcileStatus.Conditions = append(resourceStatus.ReconcileStatus.Conditions, *deletedCondition)
}
}

resourceStatusJSON, err := json.Marshal(resourceStatus)
if err != nil {
return nil, fmt.Errorf("failed to marshal resource status: %v", err)
}
err = json.Unmarshal(resourceStatusJSON, &resource.Status)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal resource status: %v", err)
}

return resource, nil
}
13 changes: 11 additions & 2 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/event"
"github.com/openshift-online/maestro/pkg/logger"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
mqttoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
Expand Down Expand Up @@ -245,11 +246,18 @@ func (helper *Helper) StartControllerManager(ctx context.Context) {
go helper.ControllerManager.Start(ctx)
}

func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, mqttOptions *mqttoptions.MQTTOptions) {
func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, mqttOptions *mqttoptions.MQTTOptions, bundle bool) {
var workCodec generic.Codec[*workv1.ManifestWork]
if bundle {
workCodec = codec.NewManifestBundleCodec()
} else {
workCodec = codec.NewManifestCodec(nil)
}

clientHolder, err := work.NewClientHolderBuilder(mqttOptions).
WithClientID(clusterName).
WithClusterName(clusterName).
WithCodecs(codec.NewManifestCodec(nil)).
WithCodecs(workCodec).
NewAgentClientHolder(ctx)
if err != nil {
glog.Fatalf("Unable to create work agent holder: %s", err)
Expand All @@ -269,6 +277,7 @@ func (helper *Helper) StartGRPCResourceSourceClient() {
store,
resourceStatusHashGetter,
&ResourceCodec{},
&ResourceBundleCodec{},
)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/pulse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPulseServer(t *testing.T) {
consumer := h.CreateConsumer(clusterName)
res := h.CreateResource(consumer.Name, 1)
h.StartControllerManager(ctx)
h.StartWorkAgent(ctx, consumer.Name, h.Env().Config.MessageBroker.MQTTOptions)
h.StartWorkAgent(ctx, consumer.Name, h.Env().Config.MessageBroker.MQTTOptions, false)
clientHolder := h.WorkAgentHolder
informer := clientHolder.ManifestWorkInformer()
lister := informer.Lister().ManifestWorks(consumer.Name)
Expand Down
Loading

0 comments on commit 1979f73

Please sign in to comment.