Skip to content

Commit

Permalink
sync works when the watcher creating
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 21, 2024
1 parent 622f2ae commit c8325d8
Show file tree
Hide file tree
Showing 14 changed files with 561 additions and 164 deletions.
4 changes: 2 additions & 2 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewAgentCommand() *cobra.Command {
agentOption.MaxJSONRawLength = maxJSONRawLength
agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"}
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)

cmd := cmdConfig.NewCommandWithContext(context.TODO())
Expand Down Expand Up @@ -68,6 +68,6 @@ func NewAgentCommand() *cobra.Command {
func addFlags(fs *pflag.FlagSet) {
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
fs.BoolVar(&commonOptions.CommonOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
true, "Disable leader election.")
}
19 changes: 8 additions & 11 deletions examples/manifestworkclient/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
# gRPC Source ManifestWork Client

This example shows how to build a source ManifestWork client with `RESTFullAPIWatcherStore` and watch/create/get/update/delete works by the client.
This example shows how to build a source ManifestWork client with Maestro gRPC service and watch/create/get/update/delete works by this client.

## Build the client

Using sdk-go to build a source ManifestWork client with `RESTFullAPIWatcherStore`

```golang
sourceID := "mw-client-example"

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
maestroGRPCOptions,
sourceID,
)

if err != nil {
log.Fatal(err)
log.Fatal(err)
}

// watch/create/patch/get/delete/list by workClient
Expand Down
15 changes: 6 additions & 9 deletions examples/manifestworkclient/client-a/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

const sourceID = "mw-client-example"
Expand Down Expand Up @@ -71,13 +69,12 @@ func main() {
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = *grpcServerAddr

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client-a", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
grpcOptions,
sourceID,
)
if err != nil {
log.Fatal(err)
}
Expand Down
15 changes: 6 additions & 9 deletions examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

const sourceID = "mw-client-example"
Expand Down Expand Up @@ -67,13 +65,12 @@ func main() {
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = *grpcServerAddr

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client-b", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
grpcOptions,
sourceID,
)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ require (
k8s.io/component-base v0.29.3
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -823,10 +823,10 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g
k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 h1:Z3gwbMUZmblGTHFx6iOO1zlCJc2FJcAJsa2RASTKDqA=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4/go.mod h1:RuYCuKuVJzNxRBkSoQnxyJxyUqOyCH388DlR/QDr7rE=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 h1:/nPyxceSdi66Vs+A9LJ+7X6kLe4xK98dBMi4adZv1d0=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
88 changes: 88 additions & 0 deletions pkg/client/cloudevents/grpcsource/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package grpcsource

import (
"context"
"fmt"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client"
sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

func NewMaestroGRPCSourceWorkClient(
ctx context.Context,
apiClient *openapi.APIClient,
opts *grpc.GRPCOptions,
sourceID string,
) (workv1client.WorkV1Interface, error) {
if len(sourceID) == 0 {
return nil, fmt.Errorf("source id is required")
}

options, err := generic.BuildCloudEventsSourceOptions(opts, fmt.Sprintf("%s-maestro", sourceID), sourceID)
if err != nil {
return nil, err
}

watcherStore := newRESTFulAPIWatcherStore(ctx, apiClient, sourceID)

cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork](
ctx,
options,
nil, // resync is disabled, so lister is not required
nil, // resync is disabled, so status hash is not required
sourcecodec.NewManifestBundleCodec(),
)
if err != nil {
return nil, err
}

cloudEventsClient.Subscribe(ctx, watcherStore.HandleReceivedWork)

// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
// reconnect happened, sync the works for current watchers
if err := watcherStore.Sync(); err != nil {
klog.Errorf("failed to sync the works %v", err)
}
}
}
}()

manifestWorkClient := sourceclient.NewManifestWorkSourceClient(sourceID, cloudEventsClient, watcherStore)
return &WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}, nil

}

// WorkV1ClientWrapper wraps a ManifestWork client to a WorkV1Interface
type WorkV1ClientWrapper struct {
ManifestWorkClient *sourceclient.ManifestWorkSourceClient
}

var _ workv1client.WorkV1Interface = &WorkV1ClientWrapper{}

func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
c.ManifestWorkClient.SetNamespace(namespace)
return c.ManifestWorkClient
}

func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface {
return nil
}

func (c *WorkV1ClientWrapper) RESTClient() rest.Interface {
return nil
}
79 changes: 79 additions & 0 deletions pkg/client/cloudevents/grpcsource/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package grpcsource

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
)

// workWatcher implements the watch.Interface.
type workWatcher struct {
sync.RWMutex

result chan watch.Event
done chan struct{}
stopped bool

namespace string
}

var _ watch.Interface = &workWatcher{}

func newWorkWatcher(namespace string) *workWatcher {
return &workWatcher{
result: make(chan watch.Event),
done: make(chan struct{}),
namespace: namespace,
}
}

// ResultChan implements Interface.
func (w *workWatcher) ResultChan() <-chan watch.Event {
return w.result
}

// Stop implements Interface.
func (w *workWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
w.Lock()
defer w.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-w.done:
close(w.result)
default:
w.stopped = true
close(w.done)
}
}

// Receive an event and sends down the result channel.
func (w *workWatcher) Receive(evt watch.Event) {
if w.isStopped() {
// this watcher is stopped, do nothing.
return
}

work, ok := evt.Object.(*workv1.ManifestWork)
if !ok {
klog.Errorf("unknown event object type %T", evt.Object)
return
}

if w.namespace != metav1.NamespaceAll && w.namespace != work.Namespace {
klog.V(4).Infof("ignore the work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace)
return
}

w.result <- evt
}

func (w *workWatcher) isStopped() bool {
w.RLock()
defer w.RUnlock()

return w.stopped
}
Loading

0 comments on commit c8325d8

Please sign in to comment.