diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 6daf712..5e20be0 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -1,6 +1,7 @@ package cache import ( + "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" "sync" "github.com/rudderlabs/rudder-cp-sdk/modelv2" @@ -35,7 +36,9 @@ func (c *WorkspaceConfigCache) Get() *modelv2.WorkspaceConfigs { // If a workspace config is nil in input, it will not be updated. // Source and destination definitions are merged without removing any missing definitions. // It notifies all subscribers of the update. -func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) { +func (c *WorkspaceConfigCache) Set(data []byte) { + configs, _ := parser.Parse(data) + c.updateLock.Lock() c.merge(configs) c.updateLock.Unlock() diff --git a/internal/clients/namespace/client.go b/internal/clients/namespace/client.go index 5f8d5ae..c272e36 100644 --- a/internal/clients/namespace/client.go +++ b/internal/clients/namespace/client.go @@ -8,8 +8,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/identity" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/base" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" ) type Client struct { @@ -28,7 +26,7 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { return req, nil } -func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { +func (c *Client) GetWorkspaceConfigs(ctx context.Context) ([]byte, error) { req, err := c.Get(ctx, "/data-plane/v2/namespaces/"+c.Identity.Namespace+"/config") if err != nil { return nil, err @@ -38,15 +36,9 @@ func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceCon if err != nil { return nil, err } - - wcs, err := parser.Parse(data) - if err != nil { - return nil, err - } - - return wcs, nil + return data, nil } -func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) { +func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error) { return nil, errors.New("not implemented") } diff --git a/internal/clients/workspace/client.go b/internal/clients/workspace/client.go index 53a8328..5fd08f6 100644 --- a/internal/clients/workspace/client.go +++ b/internal/clients/workspace/client.go @@ -8,8 +8,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/identity" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/base" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" ) type Client struct { @@ -28,7 +26,7 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { return req, nil } -func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { +func (c *Client) GetWorkspaceConfigs(ctx context.Context) ([]byte, error) { req, err := c.Get(ctx, "/data-plane/v2/workspaceConfig") if err != nil { return nil, err @@ -39,14 +37,9 @@ func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceCon return nil, err } - wcs, err := parser.Parse(data) - if err != nil { - return nil, err - } - - return wcs, nil + return data, nil } -func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) { +func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error) { return nil, errors.New("not implemented") } diff --git a/internal/poller/mocks/poller.go b/internal/poller/mocks/poller.go index 48ea0cb..a8b9ec3 100644 --- a/internal/poller/mocks/poller.go +++ b/internal/poller/mocks/poller.go @@ -10,7 +10,6 @@ import ( time "time" gomock "github.com/golang/mock/gomock" - modelv2 "github.com/rudderlabs/rudder-cp-sdk/modelv2" ) // MockClient is a mock of Client interface. @@ -37,10 +36,10 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { } // GetUpdatedWorkspaceConfigs mocks base method. -func (m *MockClient) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) (*modelv2.WorkspaceConfigs, error) { +func (m *MockClient) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) ([]byte, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetUpdatedWorkspaceConfigs", ctx, updatedAt) - ret0, _ := ret[0].(*modelv2.WorkspaceConfigs) + ret0, _ := ret[0].([]byte) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -52,10 +51,10 @@ func (mr *MockClientMockRecorder) GetUpdatedWorkspaceConfigs(ctx, updatedAt inte } // GetWorkspaceConfigs mocks base method. -func (m *MockClient) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { +func (m *MockClient) GetWorkspaceConfigs(ctx context.Context) ([]byte, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetWorkspaceConfigs", ctx) - ret0, _ := ret[0].(*modelv2.WorkspaceConfigs) + ret0, _ := ret[0].([]byte) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/poller/poller.go b/internal/poller/poller.go index bd39e85..5e7e0c0 100644 --- a/internal/poller/poller.go +++ b/internal/poller/poller.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" "github.com/rudderlabs/rudder-go-kit/logger" ) @@ -19,11 +18,11 @@ type Poller struct { log logger.Logger } -type WorkspaceConfigHandler func(*modelv2.WorkspaceConfigs) error +type WorkspaceConfigHandler func([]byte) error type Client interface { - GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) - GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) (*modelv2.WorkspaceConfigs, error) + GetWorkspaceConfigs(ctx context.Context) ([]byte, error) + GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) ([]byte, error) } func New(handler WorkspaceConfigHandler, opts ...Option) (*Poller, error) { @@ -66,32 +65,5 @@ func (p *Poller) Start(ctx context.Context) { } func (p *Poller) poll(ctx context.Context) error { - var response *modelv2.WorkspaceConfigs - if p.updatedAt.IsZero() { - p.log.Debug("polling for workspace configs") - wcs, err := p.client.GetWorkspaceConfigs(ctx) - if err != nil { - return fmt.Errorf("failed to get workspace configs: %w", err) - } - - response = wcs - } else { - p.log.Debugf("polling for workspace configs updated after %v", p.updatedAt) - wcs, err := p.client.GetUpdatedWorkspaceConfigs(ctx, p.updatedAt) - if err != nil { - return fmt.Errorf("failed to get updated workspace configs: %w", err) - } - - response = wcs - } - - if err := p.handler(response); err != nil { - return fmt.Errorf("failed to handle workspace configs: %w", err) - } - - // only update updatedAt if we managed to handle the response - // so that we don't miss any updates in case of an error - p.updatedAt = response.UpdatedAt() - return nil } diff --git a/modelv2/sources.go b/modelv2/sources.go index 41ca15a..51794d1 100644 --- a/modelv2/sources.go +++ b/modelv2/sources.go @@ -1,16 +1,16 @@ package modelv2 type Source struct { - Name string `json:"name"` - WriteKey string `json:"writeKey"` - Enabled bool `json:"enabled"` - Deleted bool `json:"deleted"` - Config map[string]interface{} `json:"config"` - Transient bool `json:"transient"` - DefinitionName string `json:"sourceDefinitionName"` - SecretVersion int `json:"secretVersion"` - AccountID string `json:"accountId"` - TrackingPlanConfig *DGSourceTrackingPlan `json:"dgSourceTrackingPlanConfig"` + Name string `json:"name"` + WriteKey string `json:"writeKey"` + Enabled bool `json:"enabled"` + Deleted bool `json:"deleted"` + Config any `json:"config"` + Transient bool `json:"transient"` + DefinitionName string `json:"sourceDefinitionName"` + SecretVersion int `json:"secretVersion"` + AccountID string `json:"accountId"` + TrackingPlanConfig *DGSourceTrackingPlan `json:"dgSourceTrackingPlanConfig"` // TODO: consider adding the following fields // CreatedBy string `json:"createdBy"` // CreatedAt time.Time `json:"createdAt"` diff --git a/sdk.go b/sdk.go index c1cdd1b..d0723f1 100644 --- a/sdk.go +++ b/sdk.go @@ -14,7 +14,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/internal/clients/namespace" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/workspace" "github.com/rudderlabs/rudder-cp-sdk/internal/poller" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" "github.com/rudderlabs/rudder-cp-sdk/notifications" "github.com/rudderlabs/rudder-go-kit/logger" ) @@ -42,8 +41,8 @@ type ControlPlane struct { } type Client interface { - GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) - GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) + GetWorkspaceConfigs(ctx context.Context) ([]byte, error) + GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) ([]byte, error) } type RequestDoer interface { @@ -115,7 +114,7 @@ func (cp *ControlPlane) setupPoller() error { return nil } - handle := func(wc *modelv2.WorkspaceConfigs) error { + handle := func(wc []byte) error { cp.configsCache.Set(wc) return nil } @@ -146,12 +145,8 @@ func (cp *ControlPlane) Close() { // GetWorkspaceConfigs returns the latest workspace configs. // If polling is enabled, this will return the cached configs, if they have been retrieved at least once. // Otherwise, it will make a request to the control plane to get the latest configs. -func (cp *ControlPlane) GetWorkspaceConfigs() (*modelv2.WorkspaceConfigs, error) { - if cp.poller != nil { - return cp.configsCache.Get(), nil - } else { - return cp.Client.GetWorkspaceConfigs(context.Background()) - } +func (cp *ControlPlane) GetWorkspaceConfigs() ([]byte, error) { + return cp.Client.GetWorkspaceConfigs(context.Background()) } type Subscriber interface {