Skip to content

Commit

Permalink
Support Option-Style API
Browse files Browse the repository at this point in the history
Support API-Style Options
  • Loading branch information
jnan806 committed Feb 14, 2023
1 parent 24aa528 commit 2817d9b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 15 deletions.
56 changes: 48 additions & 8 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,78 @@ import (
"github.com/opensergo/opensergo-go/pkg/transport/subscribe"
)

type ClientOptions struct {
connectRetryTimes uint
}

type ClientOption func(*ClientOptions)

func (clientOptions *ClientOptions) ApplyClientOptions(opts ...ClientOption) {
if len(opts) > 0 {
for _, opt := range opts {
opt(clientOptions)
}
}
}

func NewDefaultClientOptions() *ClientOptions {
return &ClientOptions{
connectRetryTimes: 3,
}
}

func (opts *ClientOptions) ConnectRetryTimes() uint {
return opts.connectRetryTimes
}

func WithConnectRetryTimes(connectRetryTimes uint) ClientOption {
return func(opts *ClientOptions) {
opts.connectRetryTimes = connectRetryTimes
}
}

// SubscribeOptions represents the options of OpenSergo data subscription.
type SubscribeOptions struct {
Subscribers []subscribe.Subscriber
Attachments map[string]interface{}
subscribers []subscribe.Subscriber
attachments map[string]interface{}
}

func (opts *SubscribeOptions) Subscribers() []subscribe.Subscriber {
return opts.subscribers
}

func (opts *SubscribeOptions) Attachments() map[string]interface{} {
return opts.attachments
}

type SubscribeOption func(*SubscribeOptions)

// WithSubscriber provides a subscriber.
func WithSubscriber(subscriber subscribe.Subscriber) SubscribeOption {
return func(opts *SubscribeOptions) {
if opts.Subscribers == nil {
opts.Subscribers = make([]subscribe.Subscriber, 0)
if opts.subscribers == nil {
opts.subscribers = make([]subscribe.Subscriber, 0)
}
opts.Subscribers = append(opts.Subscribers, subscriber)
opts.subscribers = append(opts.subscribers, subscriber)
}
}

// WithAttachment provides an attachment (key-value pair).
func WithAttachment(key string, value interface{}) SubscribeOption {
return func(opts *SubscribeOptions) {
if opts.Attachments == nil {
opts.Attachments = make(map[string]interface{})
if opts.attachments == nil {
opts.attachments = make(map[string]interface{})
}
opts.Attachments[key] = value
opts.attachments[key] = value
}
}

// OpenSergoClient is the universal interface of OpenSergo client.
type OpenSergoClient interface {
// Start the client.
Start() error
// Close the client.
Close() error
// SubscribeConfig subscribes data for given subscribe target.
SubscribeConfig(key model.SubscribeKey, opts ...SubscribeOption) error
// UnsubscribeConfig unsubscribes data for given subscribe target.
Expand Down
16 changes: 11 additions & 5 deletions pkg/client/opensergo_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (

// OpenSergoClient is the client to communicate with opensergo-control-plane.
type OpenSergoClient struct {
host string
port uint32
host string
port uint32
clientOptions *api.ClientOptions

transportServiceClient transportPb.OpenSergoUniversalTransportServiceClient

subscribeConfigStreamPtr atomic.Value // type of value is *client.subscribeConfigStream
Expand All @@ -48,7 +50,10 @@ type OpenSergoClient struct {
}

// NewOpenSergoClient returns an instance of OpenSergoClient, and init some properties.
func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) {
func NewOpenSergoClient(host string, port uint32, opts ...api.ClientOption) (*OpenSergoClient, error) {
clientOptions := api.NewDefaultClientOptions()
clientOptions.ApplyClientOptions(opts...)

address := host + ":" + strconv.FormatUint(uint64(port), 10)
clientConn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand All @@ -59,6 +64,7 @@ func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) {
openSergoClient := &OpenSergoClient{
host: host,
port: port,
clientOptions: clientOptions,
transportServiceClient: transportServiceClient,
subscribeDataCache: &subscribe.SubscribeDataCache{},
subscriberRegistry: &subscribe.SubscriberRegistry{},
Expand Down Expand Up @@ -156,8 +162,8 @@ func (c *OpenSergoClient) SubscribeConfig(subscribeKey model.SubscribeKey, opts
}

// Register subscribers.
if len(options.Subscribers) > 0 {
for _, subscriber := range options.Subscribers {
if len(options.Subscribers()) > 0 {
for _, subscriber := range options.Subscribers() {
c.subscriberRegistry.RegisterSubscriber(subscribeKey, subscriber)
}
}
Expand Down
5 changes: 3 additions & 2 deletions samples/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package main

import (
"log"

"github.com/opensergo/opensergo-go/pkg/client"
"github.com/opensergo/opensergo-go/pkg/common/logging"
"log"

"github.com/opensergo/opensergo-go/pkg/api"
"github.com/opensergo/opensergo-go/pkg/configkind"
Expand Down Expand Up @@ -45,7 +46,7 @@ func StartAndSubscribeOpenSergoConfig() error {
// logging.NewFileLogger("./opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat, true)

// Create a OpenSergoClient.
openSergoClient, err := client.NewOpenSergoClient("127.0.0.1", 10246)
openSergoClient, err := client.NewOpenSergoClient("127.0.0.1", 10246, api.WithConnectRetryTimes(5))
if err != nil {
return err
}
Expand Down

0 comments on commit 2817d9b

Please sign in to comment.