Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: poc #3

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0b4e79d
feat: initial client setup
fxenik Apr 20, 2023
7b4825b
chore: cleanup PHONY targets in Makefile
fxenik May 29, 2023
50a01ae
chore: address several review comments
fxenik May 29, 2023
7f259d8
chore: address several review comments
fxenik May 29, 2023
d319418
feat: Poller implementation periodically fetches workspace configs
fxenik Jun 29, 2023
a700f7c
feat: WorkspaceConfigCache with support for update subscriptions
fxenik Jun 29, 2023
61c0e99
chore: mod name from rudder-control-plane-sdk to rudder-cp-sdk
achettyiitr Jan 3, 2024
38eadf1
chore: main pull
achettyiitr Jan 4, 2024
d32cbb3
Empty commit
achettyiitr Jan 4, 2024
0714f62
Merge branch 'main' of github.com:rudderlabs/rudder-cp-sdk into chore…
achettyiitr Jan 23, 2024
a63807f
chore: connections to be map for workspace config
achettyiitr Jan 23, 2024
0bb2123
chore: subscriber as separate pacakge
achettyiitr Jan 30, 2024
f501827
Merge branch 'main' of github.com:rudderlabs/rudder-cp-sdk into chore…
achettyiitr Jan 31, 2024
83e91dc
chore: module name to be as cp-sdk
achettyiitr Jan 31, 2024
7a2df50
chore: resolve dependencies
achettyiitr Jan 31, 2024
ecea43e
chore: making config as any
achettyiitr Jan 31, 2024
b63a268
chore: main pull
achettyiitr Jan 31, 2024
6bf7dcf
chore: source pull
achettyiitr Jan 31, 2024
ed5569b
Merge branch 'main' of github.com:rudderlabs/rudder-cp-sdk into chore…
achettyiitr Jan 31, 2024
9e5a00c
chore: poc
achettyiitr Feb 1, 2024
ec7c747
chore: poc
achettyiitr Feb 1, 2024
6f3bbd5
chore: comment not required info from workspace config
achettyiitr Feb 5, 2024
ceb8b5d
commented non required stuff
achettyiitr Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
"sync"

"github.com/rudderlabs/rudder-cp-sdk/modelv2"
Expand Down Expand Up @@ -35,7 +36,9 @@ func (c *WorkspaceConfigCache) Get() *modelv2.WorkspaceConfigs {
// If a workspace config is nil in input, it will not be updated.
// Source and destination definitions are merged without removing any missing definitions.
// It notifies all subscribers of the update.
func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) {
func (c *WorkspaceConfigCache) Set(data []byte) {
configs, _ := parser.Parse(data)

c.updateLock.Lock()
c.merge(configs)
c.updateLock.Unlock()
Expand Down
14 changes: 3 additions & 11 deletions internal/clients/namespace/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/base"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
)

type Client struct {
Expand All @@ -28,7 +26,7 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) {
return req, nil
}

func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) {
func (c *Client) GetWorkspaceConfigs(ctx context.Context) ([]byte, error) {
req, err := c.Get(ctx, "/data-plane/v2/namespaces/"+c.Identity.Namespace+"/config")
if err != nil {
return nil, err
Expand All @@ -38,15 +36,9 @@ func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceCon
if err != nil {
return nil, err
}

wcs, err := parser.Parse(data)
if err != nil {
return nil, err
}

return wcs, nil
return data, nil
}

func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) {
func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error) {
return nil, errors.New("not implemented")
}
13 changes: 3 additions & 10 deletions internal/clients/workspace/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/base"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
)

type Client struct {
Expand All @@ -28,7 +26,7 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) {
return req, nil
}

func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) {
func (c *Client) GetWorkspaceConfigs(ctx context.Context) ([]byte, error) {
req, err := c.Get(ctx, "/data-plane/v2/workspaceConfig")
if err != nil {
return nil, err
Expand All @@ -39,14 +37,9 @@ func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceCon
return nil, err
}

wcs, err := parser.Parse(data)
if err != nil {
return nil, err
}

return wcs, nil
return data, nil
}

func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) {
func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error) {
return nil, errors.New("not implemented")
}
9 changes: 4 additions & 5 deletions internal/poller/mocks/poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 3 additions & 31 deletions internal/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-go-kit/logger"
)

Expand All @@ -19,11 +18,11 @@ type Poller struct {
log logger.Logger
}

type WorkspaceConfigHandler func(*modelv2.WorkspaceConfigs) error
type WorkspaceConfigHandler func([]byte) error

type Client interface {
GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error)
GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) (*modelv2.WorkspaceConfigs, error)
GetWorkspaceConfigs(ctx context.Context) ([]byte, error)
GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) ([]byte, error)
}

func New(handler WorkspaceConfigHandler, opts ...Option) (*Poller, error) {
Expand Down Expand Up @@ -66,32 +65,5 @@ func (p *Poller) Start(ctx context.Context) {
}

func (p *Poller) poll(ctx context.Context) error {
var response *modelv2.WorkspaceConfigs
if p.updatedAt.IsZero() {
p.log.Debug("polling for workspace configs")
wcs, err := p.client.GetWorkspaceConfigs(ctx)
if err != nil {
return fmt.Errorf("failed to get workspace configs: %w", err)
}

response = wcs
} else {
p.log.Debugf("polling for workspace configs updated after %v", p.updatedAt)
wcs, err := p.client.GetUpdatedWorkspaceConfigs(ctx, p.updatedAt)
if err != nil {
return fmt.Errorf("failed to get updated workspace configs: %w", err)
}

response = wcs
}

if err := p.handler(response); err != nil {
return fmt.Errorf("failed to handle workspace configs: %w", err)
}

// only update updatedAt if we managed to handle the response
// so that we don't miss any updates in case of an error
p.updatedAt = response.UpdatedAt()

return nil
}
20 changes: 10 additions & 10 deletions modelv2/sources.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package modelv2

type Source struct {
Name string `json:"name"`
WriteKey string `json:"writeKey"`
Enabled bool `json:"enabled"`
Deleted bool `json:"deleted"`
Config map[string]interface{} `json:"config"`
Transient bool `json:"transient"`
DefinitionName string `json:"sourceDefinitionName"`
SecretVersion int `json:"secretVersion"`
AccountID string `json:"accountId"`
TrackingPlanConfig *DGSourceTrackingPlan `json:"dgSourceTrackingPlanConfig"`
Name string `json:"name"`
WriteKey string `json:"writeKey"`
Enabled bool `json:"enabled"`
Deleted bool `json:"deleted"`
Config any `json:"config"`
Transient bool `json:"transient"`
DefinitionName string `json:"sourceDefinitionName"`
SecretVersion int `json:"secretVersion"`
AccountID string `json:"accountId"`
TrackingPlanConfig *DGSourceTrackingPlan `json:"dgSourceTrackingPlanConfig"`
// TODO: consider adding the following fields
// CreatedBy string `json:"createdBy"`
// CreatedAt time.Time `json:"createdAt"`
Expand Down
15 changes: 5 additions & 10 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/namespace"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/workspace"
"github.com/rudderlabs/rudder-cp-sdk/internal/poller"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/notifications"
"github.com/rudderlabs/rudder-go-kit/logger"
)
Expand Down Expand Up @@ -42,8 +41,8 @@ type ControlPlane struct {
}

type Client interface {
GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error)
GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error)
GetWorkspaceConfigs(ctx context.Context) ([]byte, error)
GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error)
}

type RequestDoer interface {
Expand Down Expand Up @@ -115,7 +114,7 @@ func (cp *ControlPlane) setupPoller() error {
return nil
}

handle := func(wc *modelv2.WorkspaceConfigs) error {
handle := func(wc []byte) error {
cp.configsCache.Set(wc)
return nil
}
Expand Down Expand Up @@ -146,12 +145,8 @@ func (cp *ControlPlane) Close() {
// GetWorkspaceConfigs returns the latest workspace configs.
// If polling is enabled, this will return the cached configs, if they have been retrieved at least once.
// Otherwise, it will make a request to the control plane to get the latest configs.
func (cp *ControlPlane) GetWorkspaceConfigs() (*modelv2.WorkspaceConfigs, error) {
if cp.poller != nil {
return cp.configsCache.Get(), nil
} else {
return cp.Client.GetWorkspaceConfigs(context.Background())
}
func (cp *ControlPlane) GetWorkspaceConfigs() ([]byte, error) {
return cp.Client.GetWorkspaceConfigs(context.Background())
}

type Subscriber interface {
Expand Down
Loading