diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 99090b5..932887d 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -4,7 +4,7 @@ ## Linear Ticket -< Replace_with_Linear_Link > +< Linear_Link > ## Security diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7dca8d1..86728d1 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -14,7 +14,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '~1.21' + go-version: '~1.23' check-latest: true - run: go version - run: go mod download diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index fe673d9..92cd84a 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-go@v5 with: check-latest: true - go-version: '~1.21' + go-version: '~1.23' - run: go version - run: go mod tidy @@ -38,10 +38,10 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '~1.21' + go-version: '~1.23' check-latest: true - name: golangci-lint uses: golangci/golangci-lint-action@v6 with: - version: v1.55.2 + version: v1.62.2 args: -v diff --git a/.golangci.yml b/.golangci.yml index 098929f..b53c66f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ run: timeout: 7m - go: '1.21' + go: '1.23' linters: enable: diff --git a/Makefile b/Makefile index 1e68195..5e23c66 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,7 @@ install-tools: go install github.com/golang/mock/mockgen@v1.6.0 go install mvdan.cc/gofumpt@latest go install golang.org/x/tools/cmd/goimports@latest - bash ./scripts/install-golangci-lint.sh v1.55.0 + bash ./scripts/install-golangci-lint.sh v1.62.2 .PHONY: lint lint: fmt ## Run linters on all go files diff --git a/benchmark_test.go b/benchmark_test.go index 5f5715e..f7bb665 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -3,6 +3,7 @@ package cpsdk import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -10,10 +11,9 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" ) -// BenchmarkGetWorkspaceConfigs on free-us-1, last time it showed 371MB of allocations with this technique func BenchmarkGetWorkspaceConfigs(b *testing.B) { conf := config.New() - baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com") + baseURL := conf.GetString("BASE_URL", "https://dp.api.rudderstack.com/") namespace := conf.GetString("NAMESPACE", "free-us-1") identity := conf.GetString("IDENTITY", "") if identity == "" { @@ -24,39 +24,15 @@ func BenchmarkGetWorkspaceConfigs(b *testing.B) { cpSDK, err := New( WithBaseUrl(baseURL), WithLogger(logger.NOP), - WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller WithNamespaceIdentity(namespace, identity), ) require.NoError(b, err) - defer cpSDK.Close() - - _, err = cpSDK.GetWorkspaceConfigs(context.Background()) - require.NoError(b, err) -} - -// BenchmarkGetCustomWorkspaceConfigs on free-us-1, last time it showed 88MB of allocations with this technique -func BenchmarkGetCustomWorkspaceConfigs(b *testing.B) { - conf := config.New() - baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com") - namespace := conf.GetString("NAMESPACE", "free-us-1") - identity := conf.GetString("IDENTITY", "") - if identity == "" { - b.Skip("IDENTITY is not set") - return - } - - cpSDK, err := New( - WithBaseUrl(baseURL), - WithLogger(logger.NOP), - WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller - WithNamespaceIdentity(namespace, identity), - ) - require.NoError(b, err) - defer cpSDK.Close() var workspaceConfigs WorkspaceConfigs - err = cpSDK.GetCustomWorkspaceConfigs(context.Background(), &workspaceConfigs) + err = cpSDK.GetWorkspaceConfigs(context.Background(), &workspaceConfigs, time.Time{}) require.NoError(b, err) + require.NotNil(b, workspaceConfigs) + require.Greater(b, len(workspaceConfigs.Workspaces), 0) } type WorkspaceConfigs struct { diff --git a/cmd/sampleapp/main.go b/cmd/sampleapp/main.go index f5b52be..ef22487 100644 --- a/cmd/sampleapp/main.go +++ b/cmd/sampleapp/main.go @@ -4,28 +4,36 @@ import ( "context" "fmt" "os" + "os/signal" + "sync" + "time" cpsdk "github.com/rudderlabs/rudder-cp-sdk" + "github.com/rudderlabs/rudder-cp-sdk/diff" "github.com/rudderlabs/rudder-cp-sdk/identity" + "github.com/rudderlabs/rudder-cp-sdk/modelv2" + "github.com/rudderlabs/rudder-cp-sdk/poller" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" ) -var log logger.Logger - func main() { - // setup a logger using the rudder-go-kit package + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + c := config.New() loggerFactory := logger.NewFactory(c) - log = loggerFactory.NewLogger() + log := loggerFactory.NewLogger() - if err := run(); err != nil { - log.Fatalf("main error: %v", err) + if err := run(ctx, log); err != nil { + log.Fataln("main error", obskit.Error(err)) + os.Exit(1) } } // setupControlPlaneSDK creates a new SDK instance using the environment variables -func setupControlPlaneSDK() (*cpsdk.ControlPlane, error) { +func setupControlPlaneSDK(log logger.Logger) (*cpsdk.ControlPlane, error) { apiUrl := os.Getenv("CPSDK_API_URL") workspaceToken := os.Getenv("CPSDK_WORKSPACE_TOKEN") namespace := os.Getenv("CPSDK_NAMESPACE") @@ -59,18 +67,88 @@ func setupControlPlaneSDK() (*cpsdk.ControlPlane, error) { return cpsdk.New(options...) } -// run is the main function that uses the SDK -func run() error { - sdk, err := setupControlPlaneSDK() +func setupWorkspaceConfigsPoller[K comparable]( + getter poller.WorkspaceConfigsGetter[K], + handler poller.WorkspaceConfigsHandler[K], + constructor func() diff.UpdateableObject[K], + log logger.Logger, +) (*poller.WorkspaceConfigsPoller[K], error) { + return poller.NewWorkspaceConfigsPoller(getter, handler, constructor, + poller.WithLogger[K](log.Child("poller")), + poller.WithPollingInterval[K](1*time.Second), + poller.WithPollingBackoffInitialInterval[K](1*time.Second), + poller.WithPollingBackoffMaxInterval[K](1*time.Minute), + poller.WithPollingMaxElapsedTime[K](5*time.Minute), + poller.WithPollingMaxRetries[K](15), + poller.WithPollingBackoffMultiplier[K](1.5), + poller.WithOnResponse[K](func(updated bool, err error) { + if err != nil { + // e.g. bump metric on failure + log.Errorn("failed to poll workspace configs", obskit.Error(err)) + } else { + // e.g. bump metric on success + log.Debugn("successfully polled workspace configs", logger.NewBoolField("updated", updated)) + } + }), + ) +} + +func setupClientWithPoller( + cache diff.UpdateableObject[string], + cacheMu *sync.RWMutex, + log logger.Logger, +) (func(context.Context), error) { + sdk, err := setupControlPlaneSDK(log) if err != nil { - return fmt.Errorf("error setting up control plane sdk: %v", err) + return nil, fmt.Errorf("error setting up control plane sdk: %v", err) } - defer sdk.Close() - _, err = sdk.Client.GetWorkspaceConfigs(context.Background()) + updater := diff.Updater[string]{} + + p, err := setupWorkspaceConfigsPoller[string]( + func(ctx context.Context, l diff.UpdateableObject[string], updatedAfter time.Time) error { + return sdk.GetWorkspaceConfigs(ctx, l, updatedAfter) + }, + func(obj diff.UpdateableObject[string]) (time.Time, bool, error) { + cacheMu.Lock() + defer cacheMu.Unlock() + return updater.UpdateCache(obj, cache) + }, + func() diff.UpdateableObject[string] { + return &modelv2.WorkspaceConfigs{} + }, + log, + ) if err != nil { - return fmt.Errorf("error getting workspace configs: %v", err) + return nil, fmt.Errorf("error setting up poller: %v", err) } + return p.Run, nil +} + +// run is the main function that uses the SDK +func run(ctx context.Context, log logger.Logger) error { + var ( + // WARNING: if you don't want to use modelv2.WorkspaceConfigs because you're interested in a smaller subset of + // the data, then have a look at diff_test.go for an example of how to implement a custom UpdateableList. + cache = &modelv2.WorkspaceConfigs{} + cacheMu = &sync.RWMutex{} + ) + + poll, err := setupClientWithPoller(cache, cacheMu, log) + if err != nil { + return fmt.Errorf("error setting up client with poller: %v", err) + } + + pollingDone := make(chan struct{}) + go func() { + poll(ctx) // blocking call, runs until context is cancelled + close(pollingDone) + }() + + cacheMu.RLock() + // do something with "cache" here + cacheMu.RUnlock() // nolint:staticcheck + return nil } diff --git a/diff/diff.go b/diff/diff.go new file mode 100644 index 0000000..558047c --- /dev/null +++ b/diff/diff.go @@ -0,0 +1,133 @@ +package diff + +import ( + "fmt" + "iter" + "time" +) + +type UpdateableObject[K comparable] interface { + Updateables() iter.Seq[UpdateableList[K, UpdateableElement]] + NonUpdateables() iter.Seq[NonUpdateablesList[K, any]] +} + +type UpdateableElement interface { + GetUpdatedAt() time.Time + IsNil() bool +} + +type UpdateableList[K comparable, T UpdateableElement] interface { + Type() string + Length() int + Reset() + List() iter.Seq2[K, T] + GetElementByKey(id K) (T, bool) + SetElementByKey(id K, object T) +} + +type NonUpdateablesList[K comparable, T any] interface { + Type() string + Reset() + SetElementByKey(id K, object T) + List() iter.Seq2[K, T] +} + +type Updater[K comparable] struct { + latestUpdatedAt time.Time +} + +func (u *Updater[K]) UpdateCache(new, cache UpdateableObject[K]) (time.Time, bool, error) { + var ( + atLeastOneUpdate bool + latestUpdatedAt time.Time + ) + + for n := range new.Updateables() { + var ( + found, updated bool + c UpdateableList[K, UpdateableElement] + ) + for c = range cache.Updateables() { + if n.Type() == c.Type() { + found = true + break + } + } + if !found { + return time.Time{}, false, fmt.Errorf(`cannot find updateable list of type %q in cache`, n.Type()) + } + + for k, v := range n.List() { + if v.IsNil() { + cachedValue, ok := c.GetElementByKey(k) + if !ok { + return time.Time{}, false, fmt.Errorf(`value "%v" in %q was not updated but was not present in cache`, k, n.Type()) + } + + if cachedValue.IsNil() { + return time.Time{}, false, fmt.Errorf(`value "%v" in %q was not updated but was nil in cache`, k, n.Type()) + } + + n.SetElementByKey(k, cachedValue) + + continue + } + + updated = true // at least one value in "new" was not null, thus it was updated + + if v.GetUpdatedAt().After(latestUpdatedAt) { + latestUpdatedAt = v.GetUpdatedAt() + } + } + + if updated { + atLeastOneUpdate = true + + c.Reset() + // we need to iterate over the cache too because we can't simply do "cache = new", since it's an interface + // it won't persist after the function ends. + for k, v := range n.List() { + c.SetElementByKey(k, v) + } + } + } + + if err := u.replaceNonUpdateables(new, cache); err != nil { + return time.Time{}, false, err + } + + // only update updatedAt if we managed to handle the response + // so that we don't miss any updates in case of an error + if !latestUpdatedAt.IsZero() { + // There is a case where all workspaces have not been updated since the last request. + // In that case updatedAt will be zero. + u.latestUpdatedAt = latestUpdatedAt + } + + return u.latestUpdatedAt, atLeastOneUpdate, nil +} + +func (u *Updater[K]) replaceNonUpdateables(new, cache UpdateableObject[K]) error { + for n := range new.NonUpdateables() { + var ( + found bool + c NonUpdateablesList[K, any] + ) + for c = range cache.NonUpdateables() { + if n.Type() == c.Type() { + found = true + break + } + } + if !found { + return fmt.Errorf(`cannot find non updateable list of type %q in cache`, n.Type()) + } + + c.Reset() + for k, v := range n.List() { + c.SetElementByKey(k, v) + } + } + + return nil +} diff --git a/diff/diff_test.go b/diff/diff_test.go new file mode 100644 index 0000000..824bcfd --- /dev/null +++ b/diff/diff_test.go @@ -0,0 +1,340 @@ +package diff + +import ( + stdjson "encoding/json" + "iter" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestUpdateable(t *testing.T) { + // Get the data from the first call and make sure it unmarshals correctly + firstCall, err := os.ReadFile("./testdata/call_01.json") + require.NoError(t, err) + + var response UpdateableObject[string] = &WorkspaceConfigs{} + err = stdjson.Unmarshal(firstCall, &response) + require.NoError(t, err) + { + workspaces := getWorkspaces(response) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace2") + require.Equal(t, goldenWorkspace1, workspaces["workspace1"]) + require.Equal(t, goldenWorkspace2, workspaces["workspace2"]) + srcDefinitions := getSourceDefinitions(response) + require.Len(t, srcDefinitions, 1) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + dstDefinitions := getDestinationDefinitions(response) + require.Len(t, dstDefinitions, 0) + } + + // Update the cache with the first call response and make sure the workspaces are correct + var cache UpdateableObject[string] = &WorkspaceConfigs{} + updater := &Updater[string]{} + updateAfter, updated, err := updater.UpdateCache(response, cache) + require.NoError(t, err) + require.True(t, updated) + require.Equal(t, time.Date(2021, 9, 1, 6, 6, 6, 0, time.UTC), updateAfter) + { + workspaces := getWorkspaces(cache) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace2") + require.Equal(t, goldenWorkspace1, workspaces["workspace1"]) + require.Equal(t, goldenWorkspace2, workspaces["workspace2"]) + srcDefinitions := getSourceDefinitions(cache) + require.Len(t, srcDefinitions, 1) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + dstDefinitions := getDestinationDefinitions(cache) + require.Len(t, dstDefinitions, 0) + } + + // in the second call we get the same two workspaces but with no updates so they'll both be null. + // therefore nothing should change in the cache. + secondCall, err := os.ReadFile("./testdata/call_02.json") + require.NoError(t, err) + + response = &WorkspaceConfigs{} + err = stdjson.Unmarshal(secondCall, &response) + require.NoError(t, err) + { + workspaces := getWorkspaces(response) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace2") + require.Nil(t, workspaces["workspace1"]) + require.Nil(t, workspaces["workspace2"]) + srcDefinitions := getSourceDefinitions(response) + require.Len(t, srcDefinitions, 1) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + dstDefinitions := getDestinationDefinitions(response) + require.Len(t, dstDefinitions, 0) + } + + updateAfter, updated, err = updater.UpdateCache(response, cache) + require.NoError(t, err) + require.False(t, updated) + require.Equal(t, time.Date(2021, 9, 1, 6, 6, 6, 0, time.UTC), updateAfter) + { + workspaces := getWorkspaces(cache) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace2") + require.Equal(t, goldenWorkspace1, workspaces["workspace1"]) + require.Equal(t, goldenWorkspace2, workspaces["workspace2"]) + srcDefinitions := getSourceDefinitions(cache) + require.Len(t, srcDefinitions, 1) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + dstDefinitions := getDestinationDefinitions(cache) + require.Len(t, dstDefinitions, 0) + } + + // in the third call workspace1 is not updated, workspace2 is deleted, and we receive a new workspace3. + thirdCall, err := os.ReadFile("./testdata/call_03.json") + require.NoError(t, err) + + response = &WorkspaceConfigs{} + err = stdjson.Unmarshal(thirdCall, &response) + require.NoError(t, err) + { + workspaces := getWorkspaces(response) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.NotContains(t, workspaces, "workspace2") + require.Contains(t, workspaces, "workspace3") + require.Nil(t, workspaces["workspace1"]) + require.Equal(t, goldenWorkspace3, workspaces["workspace3"]) + srcDefinitions := getSourceDefinitions(response) + require.Len(t, srcDefinitions, 1) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + dstDefinitions := getDestinationDefinitions(response) + require.Len(t, dstDefinitions, 0) + } + + updateAfter, updated, err = updater.UpdateCache(response, cache) + require.NoError(t, err) + require.True(t, updated) + require.Equal(t, time.Date(2021, 9, 1, 6, 6, 7, 0, time.UTC), updateAfter) + { + workspaces := getWorkspaces(cache) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace3") + require.Equal(t, goldenWorkspace1, workspaces["workspace1"]) + require.Equal(t, goldenWorkspace3, workspaces["workspace3"]) + } + + // in the fourth call workspace1 is not updated but workspace3 is. + fourthCall, err := os.ReadFile("./testdata/call_04.json") + require.NoError(t, err) + + response = &WorkspaceConfigs{} + err = stdjson.Unmarshal(fourthCall, &response) + require.NoError(t, err) + { + workspaces := getWorkspaces(response) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace3") + require.Nil(t, workspaces["workspace1"]) + require.Equal(t, goldenUpdatedWorkspace3, workspaces["workspace3"]) + srcDefinitions := getSourceDefinitions(response) + require.Len(t, srcDefinitions, 2) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + require.Contains(t, srcDefinitions, "singer-klaviyo") + require.Equal(t, &SourceDefinition{Name: "Klaviyo"}, srcDefinitions["singer-klaviyo"]) + dstDefinitions := getDestinationDefinitions(response) + require.Len(t, dstDefinitions, 1) + require.Contains(t, dstDefinitions, "LINKEDIN_ADS") + require.Equal(t, &DestinationDefinition{Name: "LinkedIn Ads"}, dstDefinitions["LINKEDIN_ADS"]) + } + + updateAfter, updated, err = updater.UpdateCache(response, cache) + require.NoError(t, err) + require.True(t, updated) + require.Equal(t, time.Date(2021, 9, 1, 6, 6, 8, 0, time.UTC), updateAfter) + { + workspaces := getWorkspaces(cache) + require.Len(t, workspaces, 2) + require.Contains(t, workspaces, "workspace1") + require.Contains(t, workspaces, "workspace3") + require.Equal(t, goldenWorkspace1, workspaces["workspace1"]) + require.Equal(t, goldenUpdatedWorkspace3, workspaces["workspace3"]) + srcDefinitions := getSourceDefinitions(cache) + require.Len(t, srcDefinitions, 2) + require.Contains(t, srcDefinitions, "close_crm") + require.Equal(t, &SourceDefinition{Name: "Close CRM"}, srcDefinitions["close_crm"]) + require.Contains(t, srcDefinitions, "singer-klaviyo") + require.Equal(t, &SourceDefinition{Name: "Klaviyo"}, srcDefinitions["singer-klaviyo"]) + dstDefinitions := getDestinationDefinitions(cache) + require.Len(t, dstDefinitions, 1) + require.Contains(t, dstDefinitions, "LINKEDIN_ADS") + require.Equal(t, &DestinationDefinition{Name: "LinkedIn Ads"}, dstDefinitions["LINKEDIN_ADS"]) + } +} + +type WorkspaceConfigs struct { + Workspaces Workspaces `json:"workspaces"` + SourceDefinitions SourceDefinitions `json:"sourceDefinitions"` + DestinationDefinitions DestinationDefinitions `json:"destinationDefinitions"` +} + +func (wcs *WorkspaceConfigs) Updateables() iter.Seq[UpdateableList[string, UpdateableElement]] { + return func(yield func(UpdateableList[string, UpdateableElement]) bool) { + yield(&wcs.Workspaces) + } +} + +func (wcs *WorkspaceConfigs) NonUpdateables() iter.Seq[NonUpdateablesList[string, any]] { + return func(yield func(NonUpdateablesList[string, any]) bool) { + if !yield(&wcs.SourceDefinitions) { + return + } + if !yield(&wcs.DestinationDefinitions) { + return + } + } +} + +type Workspaces map[string]*WorkspaceConfig + +func (ws *Workspaces) Type() string { return "Workspaces" } +func (ws *Workspaces) Length() int { return len(*ws) } +func (ws *Workspaces) Reset() { *ws = make(map[string]*WorkspaceConfig) } + +func (ws *Workspaces) List() iter.Seq2[string, UpdateableElement] { + return func(yield func(string, UpdateableElement) bool) { + for key, wc := range *ws { + if !yield(key, wc) { + break + } + } + } +} + +func (ws *Workspaces) GetElementByKey(id string) (UpdateableElement, bool) { + wc, ok := (*ws)[id] + return wc, ok +} + +func (ws *Workspaces) SetElementByKey(id string, object UpdateableElement) { + if *ws == nil { + *ws = make(map[string]*WorkspaceConfig) + } + (*ws)[id] = object.(*WorkspaceConfig) +} + +type WorkspaceConfig struct { + Sources map[string]*Source `json:"sources"` + Destinations map[string]*Destination `json:"destinations"` + Connections map[string]*Connection `json:"connections"` + EventReplays map[string]*EventReplay `json:"eventReplays"` + UpdatedAt time.Time `json:"updatedAt"` +} + +func (wc *WorkspaceConfig) GetUpdatedAt() time.Time { return wc.UpdatedAt } +func (wc *WorkspaceConfig) IsNil() bool { return wc == nil } + +type Source struct { + Name string `json:"name"` + WriteKey string `json:"writeKey"` + Enabled bool `json:"enabled"` + DefinitionName string `json:"sourceDefinitionName"` + Config stdjson.RawMessage `json:"config"` + Deleted bool `json:"deleted"` +} + +type Destination struct { + Enabled bool `json:"enabled"` +} + +type Connection struct { + SourceID string `json:"sourceId"` + DestinationID string `json:"destinationId"` + ProcessorEnabled bool `json:"processorEnabled"` +} + +type EventReplay struct { + Sources map[string]*SourceReplay `json:"sources"` + Destinations map[string]*DestinationReplay `json:"destinations"` + Connections []ConnectionReplay `json:"connections"` +} + +type SourceReplay struct { + OriginalID string `json:"originalId"` +} + +type DestinationReplay struct { + OriginalID string `json:"originalId"` +} + +type ConnectionReplay struct { + SourceID string `json:"sourceId"` + DestinationID string `json:"destinationId"` +} + +type SourceDefinition struct { + Name string `json:"name"` +} + +type SourceDefinitions map[string]*SourceDefinition + +func (sd *SourceDefinitions) Type() string { return "SourceDefinitions" } +func (sd *SourceDefinitions) Reset() { *sd = make(map[string]*SourceDefinition) } +func (sd *SourceDefinitions) SetElementByKey(id string, object any) { + (*sd)[id] = object.(*SourceDefinition) +} + +func (sd *SourceDefinitions) List() iter.Seq2[string, any] { + return func(yield func(string, any) bool) { + for key, d := range *sd { + if !yield(key, d) { + break + } + } + } +} + +type DestinationDefinition struct { + Name string `json:"name"` +} + +type DestinationDefinitions map[string]*DestinationDefinition + +func (dd *DestinationDefinitions) Type() string { return "DestinationDefinitions" } +func (dd *DestinationDefinitions) Reset() { *dd = make(map[string]*DestinationDefinition) } +func (dd *DestinationDefinitions) SetElementByKey(id string, object any) { + (*dd)[id] = object.(*DestinationDefinition) +} + +func (dd *DestinationDefinitions) List() iter.Seq2[string, any] { + return func(yield func(string, any) bool) { + for key, d := range *dd { + if !yield(key, d) { + break + } + } + } +} + +func getWorkspaces(v UpdateableObject[string]) map[string]*WorkspaceConfig { + return v.(*WorkspaceConfigs).Workspaces +} + +func getSourceDefinitions(v UpdateableObject[string]) map[string]*SourceDefinition { + return v.(*WorkspaceConfigs).SourceDefinitions +} + +func getDestinationDefinitions(v UpdateableObject[string]) map[string]*DestinationDefinition { + return v.(*WorkspaceConfigs).DestinationDefinitions +} diff --git a/diff/golden_test.go b/diff/golden_test.go new file mode 100644 index 0000000..bf03c6a --- /dev/null +++ b/diff/golden_test.go @@ -0,0 +1,95 @@ +package diff + +import "time" + +var goldenWorkspace1 = &WorkspaceConfig{ + Sources: map[string]*Source{ + "source1": { + Name: "Dev Integration Test 1", + WriteKey: "writeKey1", + Enabled: true, + DefinitionName: "webhook", + Config: []byte(`{ + "k1": "v1", + "k2": 123, + "k3": true, + "k4": 123.123 + }`), + }, + }, + Destinations: map[string]*Destination{ + "destination1": {Enabled: true}, + }, + Connections: map[string]*Connection{ + "connectionID1": {SourceID: "source1", DestinationID: "destination1", ProcessorEnabled: true}, + }, + UpdatedAt: time.Date(2021, 9, 1, 1, 2, 3, 0, time.UTC), +} + +var goldenWorkspace2 = &WorkspaceConfig{ + Sources: map[string]*Source{ + "source2": { + Name: "Dev Integration Test 2", + WriteKey: "writeKey2", + Enabled: true, + DefinitionName: "webhook", + Config: []byte(`{ + "k5": "v1", + "k6": 123, + "k7": true, + "k8": 123.123 + }`), + }, + }, + Destinations: map[string]*Destination{ + "destination2": {Enabled: true}, + }, + Connections: map[string]*Connection{ + "connectionID2": {SourceID: "source2", DestinationID: "destination2", ProcessorEnabled: true}, + }, + UpdatedAt: time.Date(2021, 9, 1, 6, 6, 6, 0, time.UTC), +} + +var goldenWorkspace3 = &WorkspaceConfig{ + Sources: map[string]*Source{ + "source3": { + Name: "Dev Integration Test 3", + WriteKey: "writeKey3", + Enabled: true, + DefinitionName: "webhook", + Config: []byte(`{ + "k9": "v1", + "k0": 123 + }`), + }, + }, + Destinations: map[string]*Destination{ + "destination3": {Enabled: true}, + }, + Connections: map[string]*Connection{ + "connectionID3": {SourceID: "source3", DestinationID: "destination3", ProcessorEnabled: true}, + }, + UpdatedAt: time.Date(2021, 9, 1, 6, 6, 7, 0, time.UTC), +} + +var goldenUpdatedWorkspace3 = &WorkspaceConfig{ + Sources: map[string]*Source{ + "source3": { + Name: "Dev Integration Test 3 - DISABLED", + WriteKey: "writeKey3", + Enabled: false, + DefinitionName: "webhook", + Config: []byte(`{ + "k9": "v1", + "k0": 123 + }`), + }, + }, + Destinations: map[string]*Destination{ + "destination3": {Enabled: true}, + }, + Connections: map[string]*Connection{ + "connectionID3": {SourceID: "source3", DestinationID: "destination3", ProcessorEnabled: true}, + }, + UpdatedAt: time.Date(2021, 9, 1, 6, 6, 8, 0, time.UTC), +} diff --git a/diff/testdata/call_01.json b/diff/testdata/call_01.json new file mode 100644 index 0000000..365b50c --- /dev/null +++ b/diff/testdata/call_01.json @@ -0,0 +1,63 @@ +{ + "workspaces": { + "workspace1": { + "sources": { + "source1": { + "name": "Dev Integration Test 1", + "writeKey": "writeKey1", + "sourceDefinitionName": "webhook", + "enabled": true, + "config": { + "k1": "v1", + "k2": 123, + "k3": true, + "k4": 123.123 + } + } + }, + "destinations": { + "destination1": {"name":"Amplitude","enabled":true} + }, + "connections": { + "connectionID1": { + "sourceId": "source1", + "destinationId": "destination1", + "processorEnabled": true + } + }, + "updatedAt": "2021-09-01T01:02:03Z" + }, + "workspace2": { + "sources": { + "source2": { + "name": "Dev Integration Test 2", + "writeKey": "writeKey2", + "sourceDefinitionName": "webhook", + "enabled": true, + "config": { + "k5": "v1", + "k6": 123, + "k7": true, + "k8": 123.123 + } + } + }, + "destinations": { + "destination2": {"name":"Postgres","enabled":true} + }, + "connections": { + "connectionID2": { + "sourceId": "source2", + "destinationId": "destination2", + "processorEnabled": true + } + }, + "updatedAt": "2021-09-01T06:06:06Z" + } + }, + "sourceDefinitions": { + "close_crm": { + "name": "Close CRM" + } + } +} \ No newline at end of file diff --git a/diff/testdata/call_02.json b/diff/testdata/call_02.json new file mode 100644 index 0000000..bff532d --- /dev/null +++ b/diff/testdata/call_02.json @@ -0,0 +1,11 @@ +{ + "workspaces": { + "workspace1": null, + "workspace2": null + }, + "sourceDefinitions": { + "close_crm": { + "name": "Close CRM" + } + } +} \ No newline at end of file diff --git a/diff/testdata/call_03.json b/diff/testdata/call_03.json new file mode 100644 index 0000000..bb76a1f --- /dev/null +++ b/diff/testdata/call_03.json @@ -0,0 +1,35 @@ +{ + "workspaces": { + "workspace1": null, + "workspace3": { + "sources": { + "source3": { + "name": "Dev Integration Test 3", + "writeKey": "writeKey3", + "sourceDefinitionName": "webhook", + "enabled": true, + "config": { + "k9": "v1", + "k0": 123 + } + } + }, + "destinations": { + "destination3": {"name":"MySQL","enabled":true} + }, + "connections": { + "connectionID3": { + "sourceId": "source3", + "destinationId": "destination3", + "processorEnabled": true + } + }, + "updatedAt": "2021-09-01T06:06:07Z" + } + }, + "sourceDefinitions": { + "close_crm": { + "name": "Close CRM" + } + } +} \ No newline at end of file diff --git a/diff/testdata/call_04.json b/diff/testdata/call_04.json new file mode 100644 index 0000000..96b90f4 --- /dev/null +++ b/diff/testdata/call_04.json @@ -0,0 +1,43 @@ +{ + "workspaces": { + "workspace1": null, + "workspace3": { + "sources": { + "source3": { + "name": "Dev Integration Test 3 - DISABLED", + "writeKey": "writeKey3", + "sourceDefinitionName": "webhook", + "enabled": false, + "config": { + "k9": "v1", + "k0": 123 + } + } + }, + "destinations": { + "destination3": {"name":"MySQL","enabled":true} + }, + "connections": { + "connectionID3": { + "sourceId": "source3", + "destinationId": "destination3", + "processorEnabled": true + } + }, + "updatedAt": "2021-09-01T06:06:08Z" + } + }, + "sourceDefinitions": { + "close_crm": { + "name": "Close CRM" + }, + "singer-klaviyo": { + "name": "Klaviyo" + } + }, + "destinationDefinitions": { + "LINKEDIN_ADS": { + "name": "LinkedIn Ads" + } + } +} \ No newline at end of file diff --git a/go.mod b/go.mod index e615002..1e9cc8d 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module github.com/rudderlabs/rudder-cp-sdk -go 1.22.5 +go 1.23.4 require ( - github.com/golang/mock v1.6.0 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/json-iterator/go v1.1.12 github.com/rudderlabs/rudder-go-kit v0.40.0 + github.com/rudderlabs/rudder-observability-kit v0.0.3 github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 7ea52a6..9bf2892 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -6,8 +8,6 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= -github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -39,6 +39,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rudderlabs/rudder-go-kit v0.40.0 h1:Vk/NZm2DUuOiMmTSKUWYQVbXkl4If9KdGQOjNpXCPC4= github.com/rudderlabs/rudder-go-kit v0.40.0/go.mod h1:GtOYIFfVvNcXabgGytoGdsjdpKTH6PipFIom0bY94WQ= +github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q= +github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8= github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3N51bwOk= github.com/sagikazarmark/locafero v0.6.0/go.mod h1:77OmuIc6VTraTXKXIs/uvUxKGUXjE1GbemJYHqdNjX0= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= @@ -59,7 +61,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w= github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -68,36 +69,14 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/cache/cache.go b/internal/cache/cache.go deleted file mode 100644 index 6daf712..0000000 --- a/internal/cache/cache.go +++ /dev/null @@ -1,120 +0,0 @@ -package cache - -import ( - "sync" - - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/notifications" -) - -type WorkspaceConfigCache struct { - configs *modelv2.WorkspaceConfigs - updateLock sync.Mutex - subscribers []chan notifications.WorkspaceConfigNotification - subscriberLock sync.Mutex -} - -type WorkspaceConfigNotification struct{} - -// Get returns a copy of the current workspace configs. -// This is in order to avoid race conditions when consumers read the configs while they are being updated. -func (c *WorkspaceConfigCache) Get() *modelv2.WorkspaceConfigs { - c.updateLock.Lock() - defer c.updateLock.Unlock() - - if c.configs == nil { - return nil - } - - return copyConfigs(c.configs) -} - -// Set updates the current workspace configs by merging input with current cache contents. -// Workspace configs are merged by id, so if a workspace config is updated, it will replace the previous one. -// If a workspace config is not included in input, it will be removed from the cache. -// If a workspace config is nil in input, it will not be updated. -// Source and destination definitions are merged without removing any missing definitions. -// It notifies all subscribers of the update. -func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) { - c.updateLock.Lock() - c.merge(configs) - c.updateLock.Unlock() - - c.subscriberLock.Lock() - defer c.subscriberLock.Unlock() - - for _, subscriber := range c.subscribers { - subscriber <- notifications.WorkspaceConfigNotification{} - } -} - -func (c *WorkspaceConfigCache) merge(configs *modelv2.WorkspaceConfigs) { - if c.configs == nil { - c.configs = modelv2.Empty() - } - - // merge source and destination definitions - for id, config := range configs.SourceDefinitions { - c.configs.SourceDefinitions[id] = config - } - - for id, config := range configs.DestinationDefinitions { - c.configs.DestinationDefinitions[id] = config - } - - // remove deleted workspace configs (missing ids) - currentIds := make([]string, 0, len(c.configs.Workspaces)) - for id := range c.configs.Workspaces { - currentIds = append(currentIds, id) - } - - for _, id := range currentIds { - if _, ok := configs.Workspaces[id]; !ok { - delete(c.configs.Workspaces, id) - } - } - - // merge workspace configs with updates (not nil values) - for id, config := range configs.Workspaces { - if config != nil { - c.configs.Workspaces[id] = config - } - } -} - -// Subscribe returns a subscriber that will be notified of any updates to the workspace configs. -// Subscribers are notified in the order they are subscribed. -// They can monitor for updates by reading from the notifications channel, provided by the Notifications function. -// It is expected to handle any notifications in a timely manner, otherwise it will block the cache from updating. -func (c *WorkspaceConfigCache) Subscribe() chan notifications.WorkspaceConfigNotification { - c.subscriberLock.Lock() - defer c.subscriberLock.Unlock() - - subscriber := make(chan notifications.WorkspaceConfigNotification) - - c.subscribers = append(c.subscribers, subscriber) - - return subscriber -} - -func copyConfigs(c *modelv2.WorkspaceConfigs) *modelv2.WorkspaceConfigs { - wc := &modelv2.WorkspaceConfigs{ - SourceDefinitions: make(map[string]*modelv2.SourceDefinition), - DestinationDefinitions: make(map[string]*modelv2.DestinationDefinition), - Workspaces: make(map[string]*modelv2.WorkspaceConfig), - } - - for k, v := range c.SourceDefinitions { - wc.SourceDefinitions[k] = v - } - - for k, v := range c.DestinationDefinitions { - wc.DestinationDefinitions[k] = v - } - - for k, v := range c.Workspaces { - wc.Workspaces[k] = v - } - - return wc -} diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go deleted file mode 100644 index 848f80b..0000000 --- a/internal/cache/cache_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package cache_test - -import ( - "sync" - "testing" - "time" - - "github.com/rudderlabs/rudder-cp-sdk/internal/cache" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/stretchr/testify/assert" -) - -func TestCache(t *testing.T) { - c := &cache.WorkspaceConfigCache{} - assert.Nil(t, c.Get(), "cache starts empty") - - c.Set(fullConfig) - assert.Equal(t, fullConfig, c.Get(), "cache contains full config") - - c.Set(updatedConfig) - assert.Equal(t, expectedMergedConfig, c.Get(), "cache contains merged config") -} - -func TestCacheSubscriptions(t *testing.T) { - c := &cache.WorkspaceConfigCache{} - - s := c.Subscribe() - - configs := make([]*modelv2.WorkspaceConfigs, 0) - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - for range s { - configs = append(configs, c.Get()) - wg.Done() - if len(configs) == 2 { - return - } - } - }() - - c.Set(fullConfig) - c.Set(updatedConfig) - - wg.Wait() - - assert.Equal(t, fullConfig, configs[0]) - assert.Equal(t, expectedMergedConfig, configs[1]) -} - -var fullConfig = &modelv2.WorkspaceConfigs{ - SourceDefinitions: map[string]*modelv2.SourceDefinition{ - "src-def-1": { - Name: "src-def-1", - }, - "src-def-2": { - Name: "src-def-2", - }, - }, - DestinationDefinitions: map[string]*modelv2.DestinationDefinition{ - "dst-def-1": { - Name: "dst-def-1", - }, - "dst-def-2": { - Name: "dst-def-2", - }, - }, - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "ws-1": { - UpdatedAt: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), - }, - "ws-2": { - UpdatedAt: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), - }, - "ws-3": { - UpdatedAt: time.Date(2020, 1, 3, 0, 0, 0, 0, time.UTC), - }, - }, -} - -var updatedConfig = &modelv2.WorkspaceConfigs{ - SourceDefinitions: map[string]*modelv2.SourceDefinition{ - "src-def-1": { - Name: "src-def-1-updated", - }, - "src-def-3": { - Name: "src-def-3", - }, - }, - DestinationDefinitions: map[string]*modelv2.DestinationDefinition{ - "dst-def-2": { - Name: "dst-def-2-updated", - }, - "dst-def-3": { - Name: "dst-def-3", - }, - }, - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "ws-1": { - UpdatedAt: time.Date(2020, 1, 4, 0, 0, 0, 0, time.UTC), - }, - // no update for ws-2 - "ws-2": nil, - // ws-3 removed - "ws-4": { - UpdatedAt: time.Date(2020, 1, 5, 0, 0, 0, 0, time.UTC), - }, - }, -} - -var expectedMergedConfig = &modelv2.WorkspaceConfigs{ - SourceDefinitions: map[string]*modelv2.SourceDefinition{ - "src-def-1": updatedConfig.SourceDefinitions["src-def-1"], - "src-def-2": fullConfig.SourceDefinitions["src-def-2"], - "src-def-3": updatedConfig.SourceDefinitions["src-def-3"], - }, - DestinationDefinitions: map[string]*modelv2.DestinationDefinition{ - "dst-def-1": fullConfig.DestinationDefinitions["dst-def-1"], - "dst-def-2": updatedConfig.DestinationDefinitions["dst-def-2"], - "dst-def-3": updatedConfig.DestinationDefinitions["dst-def-3"], - }, - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "ws-1": updatedConfig.Workspaces["ws-1"], - "ws-2": fullConfig.Workspaces["ws-2"], - "ws-4": updatedConfig.Workspaces["ws-4"], - }, -} diff --git a/internal/clients/base/client.go b/internal/clients/base/client.go index 1efe615..0ef4ac4 100644 --- a/internal/clients/base/client.go +++ b/internal/clients/base/client.go @@ -7,10 +7,13 @@ import ( "io" "net/http" "net/url" + "time" "github.com/rudderlabs/rudder-go-kit/httputil" ) +const updatedAfterTimeFormat = "2006-01-02T15:04:05.000Z" + type Client struct { HTTPClient HTTPClient BaseURL *url.URL @@ -20,12 +23,20 @@ type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -func (c *Client) Url(path string) string { - return c.BaseURL.JoinPath(path).String() +func (c *Client) Url(path string, updatedAfter time.Time) string { + v := c.BaseURL.JoinPath(path) + + if !updatedAfter.IsZero() { + queryValues := v.Query() + queryValues.Add("updatedAfter", updatedAfter.Format(updatedAfterTimeFormat)) + v.RawQuery = queryValues.Encode() + } + + return v.String() } -func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { - req, err := http.NewRequestWithContext(ctx, "GET", c.Url(path), nil) +func (c *Client) Get(ctx context.Context, path string, updatedAfter time.Time) (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.Url(path, updatedAfter), nil) if err != nil { return nil, err } diff --git a/internal/clients/namespace/client.go b/internal/clients/namespace/client.go index 1aec44c..a72b3fb 100644 --- a/internal/clients/namespace/client.go +++ b/internal/clients/namespace/client.go @@ -2,7 +2,6 @@ package namespace import ( "context" - "errors" "fmt" "io" "net/http" @@ -12,8 +11,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/identity" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/base" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" ) var json = jsoniter.ConfigFastest @@ -24,8 +21,8 @@ type Client struct { Identity *identity.Namespace } -func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { - req, err := c.Client.Get(ctx, path) +func (c *Client) Get(ctx context.Context, path string, updatedAfter time.Time) (*http.Request, error) { + req, err := c.Client.Get(ctx, path, updatedAfter) if err != nil { return nil, err } @@ -35,8 +32,8 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { return req, nil } -func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, error) { - req, err := c.Get(ctx, "/configuration/v2/namespaces/"+c.Identity.Namespace) +func (c *Client) getWorkspaceConfigsReader(ctx context.Context, updatedAfter time.Time) (io.ReadCloser, error) { + req, err := c.Get(ctx, "/configuration/v2/namespaces/"+c.Identity.Namespace, updatedAfter) if err != nil { return nil, err } @@ -44,24 +41,8 @@ func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, return c.Send(req) } -func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { - reader, err := c.getWorkspaceConfigsReader(ctx) - if err != nil { - return nil, err - } - - defer func() { _ = reader.Close() }() - - wcs, err := parser.Parse(reader) - if err != nil { - return nil, err - } - - return wcs, nil -} - -func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) error { - reader, err := c.getWorkspaceConfigsReader(ctx) +func (c *Client) GetWorkspaceConfigs(ctx context.Context, object any, updatedAfter time.Time) error { + reader, err := c.getWorkspaceConfigsReader(ctx, updatedAfter) if err != nil { return err } @@ -74,7 +55,3 @@ func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) erro return nil } - -func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) { - return nil, errors.New("not implemented") -} diff --git a/internal/clients/workspace/client.go b/internal/clients/workspace/client.go index 34a6f3f..49a19fb 100644 --- a/internal/clients/workspace/client.go +++ b/internal/clients/workspace/client.go @@ -2,7 +2,6 @@ package workspace import ( "context" - "errors" "fmt" "io" "net/http" @@ -12,8 +11,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/identity" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/base" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" ) var json = jsoniter.ConfigFastest @@ -24,8 +21,8 @@ type Client struct { Identity *identity.Workspace } -func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { - req, err := c.Client.Get(ctx, path) +func (c *Client) Get(ctx context.Context, path string, updatedAfter time.Time) (*http.Request, error) { + req, err := c.Client.Get(ctx, path, updatedAfter) if err != nil { return nil, err } @@ -35,8 +32,8 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) { return req, nil } -func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, error) { - req, err := c.Get(ctx, "/data-plane/v2/workspaceConfig") +func (c *Client) getWorkspaceConfigsReader(ctx context.Context, updatedAfter time.Time) (io.ReadCloser, error) { + req, err := c.Get(ctx, "/data-plane/v2/workspaceConfig", updatedAfter) if err != nil { return nil, err } @@ -44,24 +41,8 @@ func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, return c.Send(req) } -func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { - reader, err := c.getWorkspaceConfigsReader(ctx) - if err != nil { - return nil, err - } - - defer func() { _ = reader.Close() }() - - wcs, err := parser.Parse(reader) - if err != nil { - return nil, err - } - - return wcs, nil -} - -func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) error { - reader, err := c.getWorkspaceConfigsReader(ctx) +func (c *Client) GetWorkspaceConfigs(ctx context.Context, object any, updatedAfter time.Time) error { + reader, err := c.getWorkspaceConfigsReader(ctx, updatedAfter) if err != nil { return err } @@ -74,7 +55,3 @@ func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) erro return nil } - -func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) { - return nil, errors.New("not implemented") -} diff --git a/internal/poller/mocks/poller.go b/internal/poller/mocks/poller.go deleted file mode 100644 index 48ea0cb..0000000 --- a/internal/poller/mocks/poller.go +++ /dev/null @@ -1,67 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: poller.go - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" - modelv2 "github.com/rudderlabs/rudder-cp-sdk/modelv2" -) - -// MockClient is a mock of Client interface. -type MockClient struct { - ctrl *gomock.Controller - recorder *MockClientMockRecorder -} - -// MockClientMockRecorder is the mock recorder for MockClient. -type MockClientMockRecorder struct { - mock *MockClient -} - -// NewMockClient creates a new mock instance. -func NewMockClient(ctrl *gomock.Controller) *MockClient { - mock := &MockClient{ctrl: ctrl} - mock.recorder = &MockClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockClient) EXPECT() *MockClientMockRecorder { - return m.recorder -} - -// GetUpdatedWorkspaceConfigs mocks base method. -func (m *MockClient) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) (*modelv2.WorkspaceConfigs, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetUpdatedWorkspaceConfigs", ctx, updatedAt) - ret0, _ := ret[0].(*modelv2.WorkspaceConfigs) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetUpdatedWorkspaceConfigs indicates an expected call of GetUpdatedWorkspaceConfigs. -func (mr *MockClientMockRecorder) GetUpdatedWorkspaceConfigs(ctx, updatedAt interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUpdatedWorkspaceConfigs", reflect.TypeOf((*MockClient)(nil).GetUpdatedWorkspaceConfigs), ctx, updatedAt) -} - -// GetWorkspaceConfigs mocks base method. -func (m *MockClient) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetWorkspaceConfigs", ctx) - ret0, _ := ret[0].(*modelv2.WorkspaceConfigs) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetWorkspaceConfigs indicates an expected call of GetWorkspaceConfigs. -func (mr *MockClientMockRecorder) GetWorkspaceConfigs(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkspaceConfigs", reflect.TypeOf((*MockClient)(nil).GetWorkspaceConfigs), ctx) -} diff --git a/internal/poller/options.go b/internal/poller/options.go deleted file mode 100644 index c421605..0000000 --- a/internal/poller/options.go +++ /dev/null @@ -1,27 +0,0 @@ -package poller - -import ( - "time" - - "github.com/rudderlabs/rudder-go-kit/logger" -) - -type Option func(*Poller) - -func WithClient(client Client) Option { - return func(p *Poller) { - p.client = client - } -} - -func WithPollingInterval(interval time.Duration) Option { - return func(p *Poller) { - p.interval = interval - } -} - -func WithLogger(log logger.Logger) Option { - return func(p *Poller) { - p.log = log - } -} diff --git a/internal/poller/poller.go b/internal/poller/poller.go deleted file mode 100644 index bd39e85..0000000 --- a/internal/poller/poller.go +++ /dev/null @@ -1,97 +0,0 @@ -//go:generate mockgen -source=poller.go -destination=mocks/poller.go -package=mocks -package poller - -import ( - "context" - "fmt" - "time" - - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-go-kit/logger" -) - -// Poller periodically polls for new workspace configs and runs a handler on them. -type Poller struct { - client Client - interval time.Duration - handler WorkspaceConfigHandler - updatedAt time.Time - log logger.Logger -} - -type WorkspaceConfigHandler func(*modelv2.WorkspaceConfigs) error - -type Client interface { - GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) - GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAt time.Time) (*modelv2.WorkspaceConfigs, error) -} - -func New(handler WorkspaceConfigHandler, opts ...Option) (*Poller, error) { - p := &Poller{ - interval: 1 * time.Second, - handler: handler, - log: logger.NOP, - } - - for _, opt := range opts { - opt(p) - } - - if p.handler == nil { - return nil, fmt.Errorf("handler is required") - } - - if p.client == nil { - return nil, fmt.Errorf("client is required") - } - - return p, nil -} - -// Start starts the poller goroutine. It will poll for new workspace configs every interval. -// It will stop polling when the context is cancelled. -func (p *Poller) Start(ctx context.Context) { - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(p.interval): - if err := p.poll(ctx); err != nil { - p.log.Errorf("failed to poll for workspace configs: %v", err) - } - } - } - }() -} - -func (p *Poller) poll(ctx context.Context) error { - var response *modelv2.WorkspaceConfigs - if p.updatedAt.IsZero() { - p.log.Debug("polling for workspace configs") - wcs, err := p.client.GetWorkspaceConfigs(ctx) - if err != nil { - return fmt.Errorf("failed to get workspace configs: %w", err) - } - - response = wcs - } else { - p.log.Debugf("polling for workspace configs updated after %v", p.updatedAt) - wcs, err := p.client.GetUpdatedWorkspaceConfigs(ctx, p.updatedAt) - if err != nil { - return fmt.Errorf("failed to get updated workspace configs: %w", err) - } - - response = wcs - } - - if err := p.handler(response); err != nil { - return fmt.Errorf("failed to handle workspace configs: %w", err) - } - - // only update updatedAt if we managed to handle the response - // so that we don't miss any updates in case of an error - p.updatedAt = response.UpdatedAt() - - return nil -} diff --git a/internal/poller/poller_test.go b/internal/poller/poller_test.go deleted file mode 100644 index 8cd08cf..0000000 --- a/internal/poller/poller_test.go +++ /dev/null @@ -1,152 +0,0 @@ -package poller_test - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/rudderlabs/rudder-cp-sdk/internal/poller" - "github.com/rudderlabs/rudder-cp-sdk/internal/poller/mocks" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/stretchr/testify/require" -) - -func TestPollerNew(t *testing.T) { - t.Run("should return error if handler is nil", func(t *testing.T) { - p, err := poller.New(nil) - require.Nil(t, p) - require.Error(t, err) - }) - - t.Run("should return error if client is nil", func(t *testing.T) { - p, err := poller.New(func(*modelv2.WorkspaceConfigs) error { return nil }) - require.Nil(t, p) - require.Error(t, err) - }) -} - -func TestPoller(t *testing.T) { - ctrl := gomock.NewController(t) - - t.Run("should poll using client and workspace configs handler", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - client := mocks.NewMockClient(ctrl) - client.EXPECT().GetWorkspaceConfigs(gomock.Any()).Return(mockedResponses[0], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[0].UpdatedAt()).Return(mockedResponses[1], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[1].UpdatedAt()).Return(mockedResponses[2], nil).Times(1) - - var wg sync.WaitGroup - wg.Add(len(mockedResponses)) - expectedResponseIndex := 0 - - startTestPoller(t, ctx, client, func(wcs *modelv2.WorkspaceConfigs) error { - require.Equal(t, mockedResponses[expectedResponseIndex], wcs) - expectedResponseIndex++ - if expectedResponseIndex == len(mockedResponses) { - cancel() - } - wg.Done() - return nil - }) - - wg.Wait() - }) - - t.Run("should skip failed client requests", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - client := mocks.NewMockClient(ctrl) - client.EXPECT().GetWorkspaceConfigs(gomock.Any()).Return(nil, errors.New("first call failed")).Times(1) - client.EXPECT().GetWorkspaceConfigs(gomock.Any()).Return(mockedResponses[0], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[0].UpdatedAt()).Return(mockedResponses[1], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[1].UpdatedAt()).Return(nil, errors.New("fourth call failed")).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[1].UpdatedAt()).Return(mockedResponses[2], nil).Times(1) - - var wg sync.WaitGroup - wg.Add(len(mockedResponses)) - expectedResponseIndex := 0 - - startTestPoller(t, ctx, client, func(wcs *modelv2.WorkspaceConfigs) error { - require.Equal(t, mockedResponses[expectedResponseIndex], wcs) - expectedResponseIndex++ - if expectedResponseIndex == len(mockedResponses) { - cancel() - } - wg.Done() - return nil - }) - - wg.Wait() - }) - - t.Run("should skip handler failures without updating updatedAt", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - client := mocks.NewMockClient(ctrl) - client.EXPECT().GetWorkspaceConfigs(gomock.Any()).Return(mockedResponses[0], nil).Times(1) - // this will be called twice, once for the first failed handler call and once for the second - client.EXPECT().GetWorkspaceConfigs(gomock.Any()).Return(mockedResponses[0], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[0].UpdatedAt()).Return(mockedResponses[1], nil).Times(1) - client.EXPECT().GetUpdatedWorkspaceConfigs(gomock.Any(), mockedResponses[1].UpdatedAt()).Return(mockedResponses[2], nil).Times(1) - - var wg sync.WaitGroup - wg.Add(len(mockedResponses)) - expectedResponseIndex := 0 - var hasReturnedError bool - // start a poller with handler that fails on first attempt and succeeds on second - startTestPoller(t, ctx, client, func(wcs *modelv2.WorkspaceConfigs) error { - if !hasReturnedError { - hasReturnedError = true - return errors.New("first call failed") - } - - expectedResponseIndex++ - if expectedResponseIndex == len(mockedResponses) { - cancel() - } - wg.Done() - return nil - }) - - wg.Wait() - }) -} - -func startTestPoller(t *testing.T, ctx context.Context, client poller.Client, handler poller.WorkspaceConfigHandler) { - p, err := poller.New(handler, - poller.WithClient(client), - poller.WithPollingInterval(1*time.Millisecond), - ) - require.NoError(t, err) - p.Start(ctx) -} - -var mockedResponses = []*modelv2.WorkspaceConfigs{ - { - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "wc-1": {UpdatedAt: time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC)}, - "wc-2": {UpdatedAt: time.Date(2009, 11, 18, 20, 34, 58, 651387237, time.UTC)}, - "wc-3": {UpdatedAt: time.Date(2009, 11, 19, 20, 34, 58, 651387237, time.UTC)}, - }, - }, - { - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "wc-1": nil, - "wc-2": {UpdatedAt: time.Date(2009, 11, 20, 20, 34, 58, 651387237, time.UTC)}, - "wc-3": nil, - }, - }, - { - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "wc-1": {UpdatedAt: time.Date(2009, 11, 21, 20, 34, 58, 651387237, time.UTC)}, - "wc-4": {UpdatedAt: time.Date(2009, 11, 22, 20, 34, 58, 651387237, time.UTC)}, - }, - }, -} diff --git a/modelv2/parser/parser.go b/modelv2/parser/parser.go deleted file mode 100644 index 3319b9b..0000000 --- a/modelv2/parser/parser.go +++ /dev/null @@ -1,21 +0,0 @@ -package parser - -import ( - "io" - - jsoniter "github.com/json-iterator/go" - - "github.com/rudderlabs/rudder-cp-sdk/modelv2" -) - -var json = jsoniter.ConfigFastest - -func Parse(reader io.Reader) (*modelv2.WorkspaceConfigs, error) { - res := &modelv2.WorkspaceConfigs{} - - if err := json.NewDecoder(reader).Decode(res); err != nil { - return nil, err - } - - return res, nil -} diff --git a/modelv2/parser/parser_test.go b/modelv2/parser/parser_test.go deleted file mode 100644 index d119a1e..0000000 --- a/modelv2/parser/parser_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package parser_test - -import ( - "bytes" - "os" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/modelv2/parser" -) - -func TestParse(t *testing.T) { - data, err := os.Open("testdata/workspace_configs.v2.json") - require.NoError(t, err) - - wcs, err := parser.Parse(data) - require.NoError(t, err) - - require.Equal(t, wcs, &modelv2.WorkspaceConfigs{ - Workspaces: map[string]*modelv2.WorkspaceConfig{ - "ws-1": { - Sources: map[string]*modelv2.Source{ - "src-1-1": {}, - "src-1-2": {}, - }, - Destinations: map[string]*modelv2.Destination{ - "dst-1-1": {}, - "dst-1-2": {}, - }, - }, - }, - SourceDefinitions: map[string]*modelv2.SourceDefinition{ - "src-def-1": {}, - }, - DestinationDefinitions: map[string]*modelv2.DestinationDefinition{ - "src-def-2": {}, - }, - }) -} - -func TestParseError(t *testing.T) { - wcs, err := parser.Parse(bytes.NewReader([]byte(`{ malformed json }`))) - require.Nil(t, wcs) - require.Error(t, err) -} diff --git a/modelv2/parser/testdata/workspace_configs.v2.json b/modelv2/parser/testdata/workspace_configs.v2.json deleted file mode 100644 index 64d41f5..0000000 --- a/modelv2/parser/testdata/workspace_configs.v2.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "workspaces": { - "ws-1": { - "name": "Workspace 1", - "sources": { - "src-1-1": {}, - "src-1-2": {} - }, - "destinations": { - "dst-1-1": {}, - "dst-1-2": {} - } - } - }, - "sourceDefinitions": { - "src-def-1": {} - }, - "destinationDefinitions": { - "src-def-2": {} - } -} diff --git a/modelv2/workspaceconfigs.go b/modelv2/workspaceconfigs.go index 87e8e3f..96b2b33 100644 --- a/modelv2/workspaceconfigs.go +++ b/modelv2/workspaceconfigs.go @@ -1,15 +1,72 @@ package modelv2 -import "time" +import ( + "iter" + "time" + + "github.com/rudderlabs/rudder-cp-sdk/diff" +) + +var ( + _ diff.UpdateableObject[string] = &WorkspaceConfigs{} + _ diff.UpdateableList[string, diff.UpdateableElement] = &Workspaces{} + _ diff.NonUpdateablesList[string, any] = &SourceDefinitions{} + _ diff.NonUpdateablesList[string, any] = &DestinationDefinitions{} +) // WorkspaceConfigs represents workspace configurations of one or more workspaces, as well as definitions shared by all of them. type WorkspaceConfigs struct { - // Workspaces is an map of workspace configurations. The key is a workspace ID. - Workspaces map[string]*WorkspaceConfig `json:"workspaces"` + // Workspaces is a map of workspace configurations. The key is a workspace ID. + Workspaces Workspaces `json:"workspaces"` // SourceDefinitions is a map of source definitions. The key is a source definition name. - SourceDefinitions map[string]*SourceDefinition `json:"sourceDefinitions"` + SourceDefinitions SourceDefinitions `json:"sourceDefinitions"` // DestinationDefinitions is a map of destination definitions. The key is a destination definition name. - DestinationDefinitions map[string]*DestinationDefinition `json:"destinationDefinitions"` + DestinationDefinitions DestinationDefinitions `json:"destinationDefinitions"` +} + +func (wcs *WorkspaceConfigs) Updateables() iter.Seq[diff.UpdateableList[string, diff.UpdateableElement]] { + return func(yield func(diff.UpdateableList[string, diff.UpdateableElement]) bool) { + yield(&wcs.Workspaces) + } +} + +func (wcs *WorkspaceConfigs) NonUpdateables() iter.Seq[diff.NonUpdateablesList[string, any]] { + return func(yield func(diff.NonUpdateablesList[string, any]) bool) { + if !yield(&wcs.SourceDefinitions) { + return + } + if !yield(&wcs.DestinationDefinitions) { + return + } + } +} + +type Workspaces map[string]*WorkspaceConfig + +func (ws *Workspaces) Type() string { return "Workspaces" } +func (ws *Workspaces) Length() int { return len(*ws) } +func (ws *Workspaces) Reset() { *ws = make(map[string]*WorkspaceConfig) } + +func (ws *Workspaces) List() iter.Seq2[string, diff.UpdateableElement] { + return func(yield func(string, diff.UpdateableElement) bool) { + for key, wc := range *ws { + if !yield(key, wc) { + break + } + } + } +} + +func (ws *Workspaces) GetElementByKey(id string) (diff.UpdateableElement, bool) { + wc, ok := (*ws)[id] + return wc, ok +} + +func (ws *Workspaces) SetElementByKey(id string, object diff.UpdateableElement) { + if *ws == nil { + *ws = make(map[string]*WorkspaceConfig) + } + (*ws)[id] = object.(*WorkspaceConfig) } type WorkspaceConfig struct { @@ -29,23 +86,41 @@ type WorkspaceConfig struct { UpdatedAt time.Time `json:"updatedAt"` } -// UpdatedAt returns the maximum UpdatedAt value of all included workspace configs. -func (wcs *WorkspaceConfigs) UpdatedAt() time.Time { - var updateAt time.Time - for _, wc := range wcs.Workspaces { - if wc != nil && wc.UpdatedAt.After(updateAt) { - updateAt = wc.UpdatedAt +func (wc *WorkspaceConfig) GetUpdatedAt() time.Time { return wc.UpdatedAt } +func (wc *WorkspaceConfig) IsNil() bool { return wc == nil } + +type SourceDefinitions map[string]*SourceDefinition + +func (sd *SourceDefinitions) Type() string { return "SourceDefinitions" } +func (sd *SourceDefinitions) Reset() { *sd = make(map[string]*SourceDefinition) } +func (sd *SourceDefinitions) SetElementByKey(id string, object any) { + (*sd)[id] = object.(*SourceDefinition) +} + +func (sd *SourceDefinitions) List() iter.Seq2[string, any] { + return func(yield func(string, any) bool) { + for key, d := range *sd { + if !yield(key, d) { + break + } } } +} + +type DestinationDefinitions map[string]*DestinationDefinition - return updateAt +func (dd *DestinationDefinitions) Type() string { return "DestinationDefinitions" } +func (dd *DestinationDefinitions) Reset() { *dd = make(map[string]*DestinationDefinition) } +func (dd *DestinationDefinitions) SetElementByKey(id string, object any) { + (*dd)[id] = object.(*DestinationDefinition) } -// Empty returns an empty WorkspaceConfigs object. -func Empty() *WorkspaceConfigs { - return &WorkspaceConfigs{ - Workspaces: make(map[string]*WorkspaceConfig), - SourceDefinitions: make(map[string]*SourceDefinition), - DestinationDefinitions: make(map[string]*DestinationDefinition), +func (dd *DestinationDefinitions) List() iter.Seq2[string, any] { + return func(yield func(string, any) bool) { + for key, d := range *dd { + if !yield(key, d) { + break + } + } } } diff --git a/modelv2/workspaceconfigs_test.go b/modelv2/workspaceconfigs_test.go index 5971d82..4fc6ac9 100644 --- a/modelv2/workspaceconfigs_test.go +++ b/modelv2/workspaceconfigs_test.go @@ -4,13 +4,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-cp-sdk/diff" "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/stretchr/testify/assert" ) func TestWorkspaceConfigsUpdatedAt(t *testing.T) { wcs := &modelv2.WorkspaceConfigs{ - Workspaces: map[string]*modelv2.WorkspaceConfig{ + Workspaces: modelv2.Workspaces{ "ws-1": { UpdatedAt: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), }, @@ -21,6 +23,72 @@ func TestWorkspaceConfigsUpdatedAt(t *testing.T) { }, } - updatedAt := wcs.UpdatedAt() - assert.Equal(t, time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), updatedAt, "UpdatedAt should return the maximum of all workspace's UpdatedAt") + m := make(modelv2.Workspaces, len(wcs.Workspaces)) + for uo := range wcs.Updateables() { + for id, wc := range uo.List() { + require.Equal(t, wcs.Workspaces[id], wc) + m[id] = wc.(*modelv2.WorkspaceConfig) + } + } + + require.Len(t, m, 3) + require.Equal(t, wcs.Workspaces, m) + + require.Equal(t, time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), m["ws-1"].GetUpdatedAt()) + require.Equal(t, time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), m["ws-2"].GetUpdatedAt()) + require.True(t, m["ws-3"].IsNil()) + + var updateableList diff.UpdateableList[string, diff.UpdateableElement] = &wcs.Workspaces + ws, ok := updateableList.GetElementByKey("ws-1") + require.True(t, ok) + require.Equal(t, wcs.Workspaces["ws-1"], ws) + + ws, ok = updateableList.GetElementByKey("ws-2") + require.True(t, ok) + require.Equal(t, wcs.Workspaces["ws-2"], ws) + + ws, ok = updateableList.GetElementByKey("ws-3") + require.True(t, ok) + require.Nil(t, ws) + + updateableList.SetElementByKey("ws-3", &modelv2.WorkspaceConfig{ + UpdatedAt: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + }) + ws, ok = updateableList.GetElementByKey("ws-3") + require.True(t, ok) + require.Equal(t, wcs.Workspaces["ws-3"], ws) + require.Equal(t, time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), wcs.Workspaces["ws-3"].GetUpdatedAt()) +} + +func TestNonUpdateables(t *testing.T) { + wcs := &modelv2.WorkspaceConfigs{ + Workspaces: modelv2.Workspaces{ + "ws-1": { + UpdatedAt: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + }, + "ws-2": { + UpdatedAt: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + }, + "ws-3": nil, + }, + SourceDefinitions: modelv2.SourceDefinitions{ + "source-1": &modelv2.SourceDefinition{ + Name: "Source Definition 1", + }, + }, + DestinationDefinitions: modelv2.DestinationDefinitions{ + "destination-1": &modelv2.DestinationDefinition{ + Name: "Destination Definition 1", + }, + }, + } + + nonUpdateables := make([]diff.NonUpdateablesList[string, any], 0, 2) + for nu := range wcs.NonUpdateables() { + nonUpdateables = append(nonUpdateables, nu) + } + + require.Len(t, nonUpdateables, 2) + require.Equal(t, &wcs.SourceDefinitions, nonUpdateables[0]) + require.Equal(t, &wcs.DestinationDefinitions, nonUpdateables[1]) } diff --git a/options.go b/options.go index 38d8d88..769169c 100644 --- a/options.go +++ b/options.go @@ -3,7 +3,6 @@ package cpsdk import ( "fmt" "net/url" - "time" "github.com/rudderlabs/rudder-go-kit/logger" @@ -38,13 +37,14 @@ func WithNamespaceIdentity(namespace, secret string) Option { func WithBaseUrl(baseUrl string) Option { return func(cp *ControlPlane) error { - url, err := url.Parse(baseUrl) + u, err := url.Parse(baseUrl) if err != nil { return fmt.Errorf("invalid base url: %w", err) } - cp.baseUrl = url - cp.baseUrlV2 = url + cp.baseUrl = u + cp.baseUrlV2 = u + return nil } } @@ -62,13 +62,3 @@ func WithLogger(log logger.Logger) Option { return nil } } - -// WithPollingInterval sets the interval at which the SDK polls for new configs. -// If not set, the SDK will poll every 1 second. -// If set to 0, the SDK will not poll for new configs. -func WithPollingInterval(interval time.Duration) Option { - return func(cp *ControlPlane) error { - cp.pollingInterval = interval - return nil - } -} diff --git a/poller/options.go b/poller/options.go new file mode 100644 index 0000000..9f2d9f9 --- /dev/null +++ b/poller/options.go @@ -0,0 +1,41 @@ +package poller + +import ( + "time" + + "github.com/rudderlabs/rudder-go-kit/logger" +) + +type Option[K comparable] func(*WorkspaceConfigsPoller[K]) + +func WithLogger[K comparable](log logger.Logger) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.log = log } +} + +func WithPollingInterval[K comparable](d time.Duration) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.interval = d } +} + +func WithPollingBackoffInitialInterval[K comparable](d time.Duration) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.backoff.initialInterval = d } +} + +func WithPollingBackoffMaxInterval[K comparable](d time.Duration) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.backoff.maxInterval = d } +} + +func WithPollingMaxElapsedTime[K comparable](d time.Duration) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.backoff.maxElapsedTime = d } +} + +func WithPollingMaxRetries[K comparable](n uint64) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.backoff.maxRetries = n } +} + +func WithPollingBackoffMultiplier[K comparable](m float64) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.backoff.multiplier = m } +} + +func WithOnResponse[K comparable](f func(bool, error)) Option[K] { + return func(p *WorkspaceConfigsPoller[K]) { p.onResponse = f } +} diff --git a/poller/poller.go b/poller/poller.go new file mode 100644 index 0000000..0cb1415 --- /dev/null +++ b/poller/poller.go @@ -0,0 +1,161 @@ +package poller + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + + "github.com/rudderlabs/rudder-cp-sdk/diff" + "github.com/rudderlabs/rudder-go-kit/logger" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" +) + +type WorkspaceConfigsGetter[K comparable] func(ctx context.Context, l diff.UpdateableObject[K], updatedAfter time.Time) error + +type WorkspaceConfigsHandler[K comparable] func(obj diff.UpdateableObject[K]) (time.Time, bool, error) + +// WorkspaceConfigsPoller periodically polls for new workspace configs and runs a handler on them. +type WorkspaceConfigsPoller[K comparable] struct { + getter WorkspaceConfigsGetter[K] + handler WorkspaceConfigsHandler[K] + constructor func() diff.UpdateableObject[K] + interval time.Duration + updatedAt time.Time + onResponse func(bool, error) + backoff struct { + initialInterval time.Duration + maxInterval time.Duration + maxElapsedTime time.Duration + maxRetries uint64 + multiplier float64 + } + log logger.Logger +} + +func NewWorkspaceConfigsPoller[K comparable]( + getter WorkspaceConfigsGetter[K], + handler WorkspaceConfigsHandler[K], + constructor func() diff.UpdateableObject[K], + opts ...Option[K], +) (*WorkspaceConfigsPoller[K], error) { + p := &WorkspaceConfigsPoller[K]{ + getter: getter, + handler: handler, + constructor: constructor, + interval: 1 * time.Second, + log: logger.NOP, + } + p.backoff.initialInterval = 1 * time.Second + p.backoff.maxInterval = 1 * time.Minute + p.backoff.maxElapsedTime = 5 * time.Minute + p.backoff.maxRetries = 15 + p.backoff.multiplier = 1.5 + + for _, opt := range opts { + opt(p) + } + + if p.getter == nil { + return nil, fmt.Errorf("getter is required") + } + + if p.handler == nil { + return nil, fmt.Errorf("handler is required") + } + + if p.constructor == nil { + return nil, fmt.Errorf("constructor is required") + } + + return p, nil +} + +// Run starts polling for new workspace configs every interval. +// It will stop polling when the context is cancelled. +func (p *WorkspaceConfigsPoller[K]) Run(ctx context.Context) { + // Try the first time with no delay + updated, err := p.poll(ctx) + if p.onResponse != nil { + p.onResponse(updated, err) + } + if err != nil { // Log the error and retry with backoff later, no need to retry here + p.log.Errorn("failed to poll workspace configs", obskit.Error(err)) + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(p.interval): + updated, err := p.poll(ctx) + if p.onResponse != nil { + p.onResponse(updated, err) + } + if err == nil { + continue + } + + p.log.Errorn("failed to poll workspace configs", obskit.Error(err)) + + nextBackOff := p.nextBackOff() + retryLoop: + for delay := nextBackOff(); delay != backoff.Stop; delay = nextBackOff() { + select { + case <-ctx.Done(): + return + case <-time.After(delay): + updated, err = p.poll(ctx) + if p.onResponse != nil { + p.onResponse(updated, err) + } + if err != nil { + p.log.Warnn("failed to poll workspace configs after delay", + logger.NewDurationField("delay", delay), + obskit.Error(err), + ) + } else { + break retryLoop + } + } + } + if err != nil { + p.log.Errorn("failed to poll workspace configs after backoff", + logger.NewDurationField("backoff", p.backoff.maxInterval), + obskit.Error(err), + ) + } + } + } +} + +func (p *WorkspaceConfigsPoller[K]) poll(ctx context.Context) (bool, error) { + p.log.Debugn("polling for workspace configs", logger.NewTimeField("updatedAt", p.updatedAt)) + + response := p.constructor() + err := p.getter(ctx, response, p.updatedAt) + if err != nil { + return false, fmt.Errorf("failed to get updated workspace configs: %w", err) + } + + updatedAt, updated, err := p.handler(response) + if err != nil { + return false, fmt.Errorf("failed to handle workspace configs: %w", err) + } + + if !updatedAt.IsZero() { + p.updatedAt = updatedAt + } + + return updated, nil +} + +func (p *WorkspaceConfigsPoller[K]) nextBackOff() func() time.Duration { + return backoff.WithMaxRetries(backoff.NewExponentialBackOff( + backoff.WithInitialInterval(p.backoff.initialInterval), + backoff.WithMaxInterval(p.backoff.maxInterval), + backoff.WithMaxElapsedTime(p.backoff.maxElapsedTime), + backoff.WithMultiplier(p.backoff.multiplier), + ), p.backoff.maxRetries).NextBackOff +} diff --git a/poller/poller_test.go b/poller/poller_test.go new file mode 100644 index 0000000..e13307d --- /dev/null +++ b/poller/poller_test.go @@ -0,0 +1,316 @@ +package poller + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-cp-sdk/diff" + "github.com/rudderlabs/rudder-cp-sdk/modelv2" + "github.com/rudderlabs/rudder-go-kit/logger" +) + +func TestPollerNew(t *testing.T) { + getter := func(_ context.Context, _ diff.UpdateableObject[string], _ time.Time) error { + return nil + } + handler := func(_ diff.UpdateableObject[string]) (time.Time, bool, error) { + return time.Time{}, false, nil + } + constructor := func() diff.UpdateableObject[string] { + return nil + } + + t.Run("should return error if getter is nil", func(t *testing.T) { + p, err := NewWorkspaceConfigsPoller[string](nil, handler, constructor) + require.Nil(t, p) + require.Error(t, err) + }) + + t.Run("should return error if handler is nil", func(t *testing.T) { + p, err := NewWorkspaceConfigsPoller[string](getter, nil, constructor) + require.Nil(t, p) + require.Error(t, err) + }) + + t.Run("should return error if constructor is nil", func(t *testing.T) { + p, err := NewWorkspaceConfigsPoller[string](getter, handler, nil) + require.Nil(t, p) + require.Error(t, err) + }) +} + +func TestPoller(t *testing.T) { + t.Run("should poll using getter and workspace configs handler", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + client := &mockClient{calls: []clientCall{ + { + dataToBeReturned: mockedResponses[0], + expectedUpdatedAt: time.Time{}, + }, + { + dataToBeReturned: mockedResponses[1], + expectedUpdatedAt: time.Date(2009, 11, 19, 20, 34, 58, 651387237, time.UTC), + }, + { + dataToBeReturned: mockedResponses[2], + expectedUpdatedAt: time.Date(2009, 11, 20, 20, 34, 58, 651387237, time.UTC), + }, + }} + + var wg sync.WaitGroup + wg.Add(len(mockedResponses)) + expectedResponseIndex := 0 + + getLatestUpdatedAt := getLatestUpdatedAt() + runTestPoller(t, ctx, client, func(obj diff.UpdateableObject[string]) (time.Time, bool, error) { + defer wg.Done() + + require.Equalf(t, mockedResponses[expectedResponseIndex], obj, "Response index: %d", expectedResponseIndex) + + expectedResponseIndex++ + if expectedResponseIndex == len(mockedResponses) { + cancel() + } + + return getLatestUpdatedAt(obj), true, nil + }) + + wg.Wait() + }) + + t.Run("should skip failed getter requests", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := &mockClient{calls: []clientCall{ + { + errToBeReturned: errors.New("first call failed"), + }, + { + dataToBeReturned: mockedResponses[0], + expectedUpdatedAt: time.Time{}, + }, + { + dataToBeReturned: mockedResponses[1], + expectedUpdatedAt: time.Date(2009, 11, 19, 20, 34, 58, 651387237, time.UTC), + }, + { + dataToBeReturned: mockedResponses[2], + expectedUpdatedAt: time.Date(2009, 11, 20, 20, 34, 58, 651387237, time.UTC), + }, + }} + + var wg sync.WaitGroup + wg.Add(len(mockedResponses)) + expectedResponseIndex := 0 + + getLatestUpdatedAt := getLatestUpdatedAt() + runTestPoller(t, ctx, client, func(obj diff.UpdateableObject[string]) (time.Time, bool, error) { + defer wg.Done() + + require.Equalf(t, mockedResponses[expectedResponseIndex], obj, "Response index: %d", expectedResponseIndex) + + expectedResponseIndex++ + if expectedResponseIndex == len(mockedResponses) { + cancel() + } + + return getLatestUpdatedAt(obj), true, nil + }) + + wg.Wait() + }) + + t.Run("should skip handler failures without updating updatedAt", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := &mockClient{calls: []clientCall{ + { // this will be called twice, once for the first failed handler call and once for the second + dataToBeReturned: mockedResponses[0], + expectedUpdatedAt: time.Time{}, + }, + { + dataToBeReturned: mockedResponses[0], + expectedUpdatedAt: time.Time{}, + }, + { + dataToBeReturned: mockedResponses[1], + expectedUpdatedAt: time.Date(2009, 11, 19, 20, 34, 58, 651387237, time.UTC), + }, + { + dataToBeReturned: mockedResponses[2], + expectedUpdatedAt: time.Date(2009, 11, 20, 20, 34, 58, 651387237, time.UTC), + }, + }} + + var wg sync.WaitGroup + wg.Add(len(mockedResponses)) + expectedResponseIndex := 0 + var hasReturnedError bool + // start a poller with handler that fails on first attempt and succeeds on second + getLatestUpdatedAt := getLatestUpdatedAt() + runTestPoller(t, ctx, client, func(obj diff.UpdateableObject[string]) (time.Time, bool, error) { + if !hasReturnedError { + hasReturnedError = true + return time.Time{}, false, errors.New("first call failed") + } + + expectedResponseIndex++ + if expectedResponseIndex == len(mockedResponses) { + cancel() + } + + wg.Done() + + return getLatestUpdatedAt(obj), true, nil + }) + + wg.Wait() + }) +} + +func runTestPoller( + t *testing.T, + ctx context.Context, + client *mockClient, + handler func(object diff.UpdateableObject[string]) (time.Time, bool, error), +) { + t.Helper() + + poll, err := setupPoller( + func(ctx context.Context, object any, updatedAfter time.Time) error { + return client.GetWorkspaceConfigs(ctx, object, updatedAfter) + }, + func(obj diff.UpdateableObject[string]) (time.Time, bool, error) { + return handler(obj) + }, + logger.NOP, + ) + require.NoError(t, err) + + done := make(chan struct{}) + t.Cleanup(func() { <-done }) + go func() { + poll(ctx) + close(done) + }() +} + +func setupPoller( + getter func(ctx context.Context, object any, updatedAfter time.Time) error, + handler WorkspaceConfigsHandler[string], + log logger.Logger, +) (func(context.Context), error) { + p, err := newWorkspaceConfigsPoller[string]( + func(ctx context.Context, l diff.UpdateableObject[string], updatedAfter time.Time) error { + return getter(ctx, l, updatedAfter) + }, + handler, + func() diff.UpdateableObject[string] { + return &modelv2.WorkspaceConfigs{} + }, + log, + ) + if err != nil { + return nil, fmt.Errorf("error setting up poller: %v", err) + } + + return p.Run, nil +} + +func newWorkspaceConfigsPoller[K comparable]( + getter WorkspaceConfigsGetter[K], + handler WorkspaceConfigsHandler[K], + constructor func() diff.UpdateableObject[K], + log logger.Logger, +) (*WorkspaceConfigsPoller[K], error) { + return NewWorkspaceConfigsPoller(getter, handler, constructor, + WithLogger[K](log.Child("poller")), + WithPollingInterval[K](time.Nanosecond), + WithPollingBackoffInitialInterval[K](time.Nanosecond), + WithPollingBackoffMaxInterval[K](time.Nanosecond), + WithPollingBackoffMultiplier[K](1), + ) +} + +func getLatestUpdatedAt() func(list diff.UpdateableObject[string]) time.Time { + var latestUpdatedAt time.Time + return func(obj diff.UpdateableObject[string]) time.Time { + for uo := range obj.Updateables() { + for _, wc := range uo.List() { + if wc.IsNil() || wc.GetUpdatedAt().IsZero() { + continue + } + if wc.GetUpdatedAt().After(latestUpdatedAt) { + latestUpdatedAt = wc.GetUpdatedAt() + } + } + } + return latestUpdatedAt + } +} + +type mockClient struct { + calls []clientCall + nextCall int +} + +type clientCall struct { + dataToBeReturned any + errToBeReturned error + expectedUpdatedAt time.Time +} + +func (m *mockClient) GetWorkspaceConfigs(ctx context.Context, object any, updatedAfter time.Time) error { + if m.nextCall >= len(m.calls) { + return errors.New("no more calls") + } + + call := m.calls[m.nextCall] + m.nextCall++ + + if call.expectedUpdatedAt.Nanosecond() != updatedAfter.Nanosecond() { + return errors.New("unexpected updatedAt") + } + + if call.errToBeReturned != nil { + return call.errToBeReturned + } + + type T = *modelv2.WorkspaceConfigs + *object.(T) = *call.dataToBeReturned.(T) + + return ctx.Err() +} + +var mockedResponses = []*modelv2.WorkspaceConfigs{ + { + Workspaces: map[string]*modelv2.WorkspaceConfig{ + "wc-1": {UpdatedAt: time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC)}, + "wc-2": {UpdatedAt: time.Date(2009, 11, 18, 20, 34, 58, 651387237, time.UTC)}, + "wc-3": {UpdatedAt: time.Date(2009, 11, 19, 20, 34, 58, 651387237, time.UTC)}, + }, + }, + { + Workspaces: map[string]*modelv2.WorkspaceConfig{ + "wc-1": nil, + "wc-2": {UpdatedAt: time.Date(2009, 11, 20, 20, 34, 58, 651387237, time.UTC)}, + "wc-3": nil, + }, + }, + { + Workspaces: map[string]*modelv2.WorkspaceConfig{ + "wc-1": {UpdatedAt: time.Date(2009, 11, 21, 20, 34, 58, 651387237, time.UTC)}, + "wc-4": {UpdatedAt: time.Date(2009, 11, 22, 20, 34, 58, 651387237, time.UTC)}, + }, + }, +} diff --git a/scripts/install-golangci-lint.sh b/scripts/install-golangci-lint.sh index ce2b06d..9785e8a 100755 --- a/scripts/install-golangci-lint.sh +++ b/scripts/install-golangci-lint.sh @@ -1,7 +1,7 @@ #!/bin/bash VERSION=$1 -[ -z "${VERSION}" ] && VERSION="v1.55.0" +[ -z "${VERSION}" ] && VERSION="v1.62.2" GOPATH=$(go env GOPATH) [ -f "${GOPATH}/bin/golangci-lint-${VERSION}" ] && echo "golangci-lint ${VERSION} is already installed" || \ -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/${VERSION}/install.sh | sh -s -- -b ${GOPATH}/bin ${VERSION} && \ -cp ${GOPATH}/bin/golangci-lint ${GOPATH}/bin/golangci-lint-${VERSION} +curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/"${VERSION}"/install.sh | sh -s -- -b "${GOPATH}"/bin "${VERSION}" && \ +cp "${GOPATH}"/bin/golangci-lint "${GOPATH}"/bin/golangci-lint-"${VERSION}" diff --git a/sdk.go b/sdk.go index 29418d3..ad6188b 100644 --- a/sdk.go +++ b/sdk.go @@ -8,14 +8,10 @@ import ( "time" "github.com/rudderlabs/rudder-cp-sdk/identity" - "github.com/rudderlabs/rudder-cp-sdk/internal/cache" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/admin" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/base" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/namespace" "github.com/rudderlabs/rudder-cp-sdk/internal/clients/workspace" - "github.com/rudderlabs/rudder-cp-sdk/internal/poller" - "github.com/rudderlabs/rudder-cp-sdk/modelv2" - "github.com/rudderlabs/rudder-cp-sdk/notifications" "github.com/rudderlabs/rudder-go-kit/logger" ) @@ -37,18 +33,10 @@ type ControlPlane struct { AdminClient *admin.Client log logger.Logger - - configsCache *cache.WorkspaceConfigCache - - pollingInterval time.Duration - poller *poller.Poller - pollerStop context.CancelFunc } type Client interface { - GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) - GetCustomWorkspaceConfigs(ctx context.Context, object any) error - GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) + GetWorkspaceConfigs(ctx context.Context, object any, updatedAfter time.Time) error } type RequestDoer interface { @@ -56,14 +44,12 @@ type RequestDoer interface { } func New(options ...Option) (*ControlPlane, error) { - url, _ := url.Parse(defaultBaseUrl) - urlV2, _ := url.Parse(defaultBaseUrlV2) + baseUrl, _ := url.Parse(defaultBaseUrl) + baseUrlV2, _ := url.Parse(defaultBaseUrlV2) cp := &ControlPlane{ - baseUrl: url, - baseUrlV2: urlV2, - log: logger.NOP, - pollingInterval: 1 * time.Second, - configsCache: &cache.WorkspaceConfigCache{}, + baseUrl: baseUrl, + baseUrlV2: baseUrlV2, + log: logger.NOP, } for _, option := range options { @@ -76,10 +62,6 @@ func New(options ...Option) (*ControlPlane, error) { return nil, err } - if err := cp.setupPoller(); err != nil { - return nil, err - } - return cp, nil } @@ -118,59 +100,7 @@ func (cp *ControlPlane) setupClients() error { return nil } -func (cp *ControlPlane) setupPoller() error { - if cp.pollingInterval == 0 { - return nil - } - - handle := func(wc *modelv2.WorkspaceConfigs) error { - cp.configsCache.Set(wc) - return nil - } - - p, err := poller.New(handle, - poller.WithClient(cp.Client), - poller.WithPollingInterval(cp.pollingInterval), - poller.WithLogger(cp.log)) - if err != nil { - return err - } - - cp.poller = p - ctx, cancel := context.WithCancel(context.Background()) - cp.pollerStop = cancel - cp.poller.Start(ctx) - - return nil -} - -// Close stops any background processes such as polling for workspace configs. -func (cp *ControlPlane) Close() { - if cp.poller != nil { - cp.pollerStop() - } -} - // GetWorkspaceConfigs returns the latest workspace configs. -// If polling is enabled, this will return the cached configs, if they have been retrieved at least once. -// Otherwise, it will make a request to the control plane to get the latest configs. -func (cp *ControlPlane) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) { - if cp.poller != nil { - return cp.configsCache.Get(), nil - } else { - return cp.Client.GetWorkspaceConfigs(ctx) - } -} - -// GetCustomWorkspaceConfigs streams the JSON payload directly in the target object. It does not support for incremental updates. -func (cp *ControlPlane) GetCustomWorkspaceConfigs(ctx context.Context, object any) error { - return cp.Client.GetCustomWorkspaceConfigs(ctx, object) -} - -type Subscriber interface { - Notifications() chan notifications.WorkspaceConfigNotification -} - -func (cp *ControlPlane) Subscribe() chan notifications.WorkspaceConfigNotification { - return cp.configsCache.Subscribe() +func (cp *ControlPlane) GetWorkspaceConfigs(ctx context.Context, object any, updatedAfter time.Time) error { + return cp.Client.GetWorkspaceConfigs(ctx, object, updatedAfter) } diff --git a/sdk_test.go b/sdk_test.go new file mode 100644 index 0000000..1864aed --- /dev/null +++ b/sdk_test.go @@ -0,0 +1,228 @@ +package cpsdk + +import ( + "context" + "fmt" + "net/http" + "os" + "testing" + "time" + + "github.com/rudderlabs/rudder-cp-sdk/diff" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-cp-sdk/modelv2" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/testhelper/httptest" +) + +const updatedAfterTimeFormat = "2006-01-02T15:04:05.000Z" + +func TestIncrementalUpdates(t *testing.T) { + var ( + ctx = context.Background() + namespace = "free-us-1" + secret = "service-secret" + requestNumber int + receivedUpdatedAfter []time.Time + ) + + responseBodyFromFile, err := os.ReadFile("./testdata/sample_namespace.json") + require.NoError(t, err) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { requestNumber++ }() + + user, _, ok := r.BasicAuth() + require.True(t, ok) + require.Equal(t, secret, user) + + var ( + err error + updatedAfterTime time.Time + responseBody []byte + ) + for k, v := range r.URL.Query() { + if k != "updatedAfter" { + continue + } + + updatedAfterTime, err = time.Parse(updatedAfterTimeFormat, v[0]) + require.NoError(t, err) + + receivedUpdatedAfter = append(receivedUpdatedAfter, updatedAfterTime) + } + + switch requestNumber { + case 0: // 1st request, return file content as is + responseBody = responseBodyFromFile + case 1: // 2nd request, return new workspace, no updates for the other 2 + responseBody = []byte(fmt.Sprintf(`{ + "workspaces": { + "dummy":{"updatedAt":%q,"libraries":[{"versionId":"foo"},{"versionId":"bar"}]}, + "2hCBi02C8xYS8Rsy1m9bJjTlKy6":null, + "2bVMV2JiAJe42OXZrzyvJI75v0N":null + } + }`, updatedAfterTime.Add(time.Minute).Format(updatedAfterTimeFormat))) + case 2: // 3rd request, return updated dummy workspace, no updates for the other 2 + responseBody = []byte(fmt.Sprintf(`{ + "workspaces": { + "dummy":{"updatedAt":%q,"libraries":[{"versionId":"baz"}]}, + "2hCBi02C8xYS8Rsy1m9bJjTlKy6":null, + "2bVMV2JiAJe42OXZrzyvJI75v0N":null + } + }`, updatedAfterTime.Add(time.Minute).Format(updatedAfterTimeFormat))) + case 3, 4: // 4th and 5th request, delete the dummy workspace + responseBody = []byte(`{ + "workspaces": { + "2hCBi02C8xYS8Rsy1m9bJjTlKy6":null, + "2bVMV2JiAJe42OXZrzyvJI75v0N":null + } + }`) + case 5: // new workspace, but its update time is before the last request, so no updates + responseBody = []byte(`{ + "workspaces": { + "someWorkspaceID": null + } + }`) + default: + responseBody = responseBodyFromFile + } + + _, _ = w.Write(responseBody) + })) + defer ts.Close() + + cpSDK, err := New( + WithBaseUrl(ts.URL), + WithLogger(logger.NOP), + WithNamespaceIdentity(namespace, secret), + ) + require.NoError(t, err) + + getLatestUpdatedAt := getLatestUpdatedAt() // this is to cache the latestUpdatedAt + + // send the request the first time + wcs := &modelv2.WorkspaceConfigs{} + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, time.Time{}) + require.NoError(t, err) + require.Len(t, wcs.Workspaces, 2) + require.Contains(t, wcs.Workspaces, "2hCBi02C8xYS8Rsy1m9bJjTlKy6") + require.NotNil(t, wcs.Workspaces["2hCBi02C8xYS8Rsy1m9bJjTlKy6"]) + require.Contains(t, wcs.Workspaces, "2bVMV2JiAJe42OXZrzyvJI75v0N") + require.NotNil(t, wcs.Workspaces["2bVMV2JiAJe42OXZrzyvJI75v0N"]) + require.Empty(t, receivedUpdatedAfter, "The first request should not have updatedAfter in the query params") + latestUpdatedAt, updatedAt := getLatestUpdatedAt(wcs) + require.Equal(t, "2024-11-27T20:13:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Equal(t, "2024-11-27T20:13:30.647Z", updatedAt.Format(updatedAfterTimeFormat)) + + // send the request again, should receive the new dummy workspace and no updates for the other 2 workspaces + wcs = &modelv2.WorkspaceConfigs{} // reset the workspace configs + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, latestUpdatedAt) + require.NoError(t, err) + require.Len(t, wcs.Workspaces, 3) + require.Contains(t, wcs.Workspaces, "2hCBi02C8xYS8Rsy1m9bJjTlKy6") + require.Nil(t, wcs.Workspaces["2hCBi02C8xYS8Rsy1m9bJjTlKy6"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "2bVMV2JiAJe42OXZrzyvJI75v0N") + require.Nil(t, wcs.Workspaces["2bVMV2JiAJe42OXZrzyvJI75v0N"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "dummy") + require.NotNil(t, wcs.Workspaces["dummy"]) + require.Len(t, receivedUpdatedAfter, 1) + latestUpdatedAt, updatedAt = getLatestUpdatedAt(wcs) + require.Equal(t, "2024-11-27T20:14:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Equal(t, "2024-11-27T20:14:30.647Z", updatedAt.Format(updatedAfterTimeFormat)) + expectedUpdatedAfter, err := time.Parse(updatedAfterTimeFormat, "2024-11-27T20:13:30.647Z") + require.NoError(t, err) + require.Equal(t, receivedUpdatedAfter[0], expectedUpdatedAfter, updatedAfterTimeFormat) + + // send the request again, should receive the updated dummy workspace + wcs = &modelv2.WorkspaceConfigs{} // reset the workspace configs + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, latestUpdatedAt) + require.NoError(t, err) + require.Len(t, wcs.Workspaces, 3) + require.Contains(t, wcs.Workspaces, "2hCBi02C8xYS8Rsy1m9bJjTlKy6") + require.Nil(t, wcs.Workspaces["2hCBi02C8xYS8Rsy1m9bJjTlKy6"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "2bVMV2JiAJe42OXZrzyvJI75v0N") + require.Nil(t, wcs.Workspaces["2bVMV2JiAJe42OXZrzyvJI75v0N"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "dummy") + require.NotNil(t, wcs.Workspaces["dummy"]) + require.Len(t, receivedUpdatedAfter, 2) + latestUpdatedAt, updatedAt = getLatestUpdatedAt(wcs) + require.Equal(t, "2024-11-27T20:15:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Equal(t, "2024-11-27T20:15:30.647Z", updatedAt.Format(updatedAfterTimeFormat)) + expectedUpdatedAfter, err = time.Parse(updatedAfterTimeFormat, "2024-11-27T20:14:30.647Z") + require.NoError(t, err) + require.Equal(t, receivedUpdatedAfter[1], expectedUpdatedAfter, updatedAfterTimeFormat) + + // send the request again, should not receive dummy since it was deleted + wcs = &modelv2.WorkspaceConfigs{} // reset the workspace configs + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, latestUpdatedAt) + require.NoError(t, err) + latestUpdatedAt, updatedAt = getLatestUpdatedAt(wcs) + require.Truef(t, updatedAt.IsZero(), "%+v", wcs) + require.Equal(t, "2024-11-27T20:15:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Len(t, wcs.Workspaces, 2) + require.Contains(t, wcs.Workspaces, "2hCBi02C8xYS8Rsy1m9bJjTlKy6") + require.Nil(t, wcs.Workspaces["2hCBi02C8xYS8Rsy1m9bJjTlKy6"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "2bVMV2JiAJe42OXZrzyvJI75v0N") + require.Nil(t, wcs.Workspaces["2bVMV2JiAJe42OXZrzyvJI75v0N"], "The workspace should have not been updated") + require.Len(t, receivedUpdatedAfter, 3) + expectedUpdatedAfter, err = time.Parse(updatedAfterTimeFormat, "2024-11-27T20:15:30.647Z") + require.NoError(t, err) + require.Equal(t, receivedUpdatedAfter[2], expectedUpdatedAfter, updatedAfterTimeFormat) + + // send the request again, the updatedAfter should be the same as the last request since no updates + wcs = &modelv2.WorkspaceConfigs{} // reset the workspace configs + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, latestUpdatedAt) + require.NoError(t, err) + latestUpdatedAt, updatedAt = getLatestUpdatedAt(wcs) + require.Truef(t, updatedAt.IsZero(), "%+v", wcs) + require.Equal(t, "2024-11-27T20:15:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Len(t, wcs.Workspaces, 2) + require.Contains(t, wcs.Workspaces, "2hCBi02C8xYS8Rsy1m9bJjTlKy6") + require.Nil(t, wcs.Workspaces["2hCBi02C8xYS8Rsy1m9bJjTlKy6"], "The workspace should have not been updated") + require.Contains(t, wcs.Workspaces, "2bVMV2JiAJe42OXZrzyvJI75v0N") + require.Nil(t, wcs.Workspaces["2bVMV2JiAJe42OXZrzyvJI75v0N"], "The workspace should have not been updated") + require.Len(t, receivedUpdatedAfter, 4) + expectedUpdatedAfter, err = time.Parse(updatedAfterTimeFormat, "2024-11-27T20:15:30.647Z") + require.NoError(t, err) + require.Equal(t, receivedUpdatedAfter[3], expectedUpdatedAfter, updatedAfterTimeFormat) + + // last request, ideally the application should detect that there is an inconsistency and trigger a full update + // although that behaviour is not tested here + wcs = &modelv2.WorkspaceConfigs{} // reset the workspace configs + err = cpSDK.GetWorkspaceConfigs(ctx, wcs, latestUpdatedAt) + require.NoError(t, err) + latestUpdatedAt, updatedAt = getLatestUpdatedAt(wcs) + require.Truef(t, updatedAt.IsZero(), "%+v", wcs) + require.Equal(t, "2024-11-27T20:15:30.647Z", latestUpdatedAt.Format(updatedAfterTimeFormat)) + require.Len(t, wcs.Workspaces, 1) + require.Contains(t, wcs.Workspaces, "someWorkspaceID") + require.Nil(t, wcs.Workspaces["someWorkspaceID"], "The workspace should have not been updated") + require.Len(t, receivedUpdatedAfter, 5) + expectedUpdatedAfter, err = time.Parse(updatedAfterTimeFormat, "2024-11-27T20:15:30.647Z") + require.NoError(t, err) + require.Equal(t, receivedUpdatedAfter[4], expectedUpdatedAfter, updatedAfterTimeFormat) +} + +func getLatestUpdatedAt() func(list diff.UpdateableObject[string]) (time.Time, time.Time) { + var latestUpdatedAt time.Time + return func(obj diff.UpdateableObject[string]) (time.Time, time.Time) { + var localUpdatedAt time.Time + for uo := range obj.Updateables() { + for _, wc := range uo.List() { + if wc.IsNil() || wc.GetUpdatedAt().IsZero() { + continue + } + if wc.GetUpdatedAt().After(latestUpdatedAt) { + latestUpdatedAt = wc.GetUpdatedAt() + } + if wc.GetUpdatedAt().After(localUpdatedAt) { + localUpdatedAt = wc.GetUpdatedAt() + } + } + } + return latestUpdatedAt, localUpdatedAt + } +} diff --git a/testdata/sample_namespace.json b/testdata/sample_namespace.json new file mode 100644 index 0000000..5d2c316 --- /dev/null +++ b/testdata/sample_namespace.json @@ -0,0 +1,455 @@ +{ + "workspaces": { + "2hCBi02C8xYS8Rsy1m9bJjTlKy6": { + "sources": { + "2hCDcJJGtZCIMDWjAyjsOg0W1Hr": { + "name": "Source 1", + "writeKey": "some-write-key-for-source-1", + "enabled": true, + "deleted": false, + "config": {}, + "liveEventsConfig": { + "eventUpload": false, + "eventUploadTS": 1729085754256 + }, + "createdBy": "2fdoRuXbHt7VRLU7Zt4cHjdypkR", + "transient": false, + "sourceDefinitionName": "Javascript", + "dgSourceTrackingPlanConfig": null, + "secretVersion": null, + "createdAt": "2024-05-30T17:14:03.570Z", + "updatedAt": "2024-10-16T13:35:54.256Z", + "geoEnrichment": { + "enabled": false + } + } + }, + "destinations": { + "2i9lmpOsrP9KHqDR5hAlKWfNzb5": { + "name": "Destination 1", + "enabled": true, + "config": { + "serviceAccountUserName": "", + "serviceAccountSecret": "", + "projectId": "3266642", + "dataResidency": "us", + "identityMergeApi": "original", + "userDeletionApi": "engage", + "strictMode": false, + "ignoreDnt": false, + "useUserDefinedPageEventName": true, + "useUserDefinedScreenEventName": false, + "people": true, + "setAllTraitsByDefault": true, + "consolidatedPageCalls": true, + "trackCategorizedPages": false, + "trackNamedPages": false, + "sourceName": "", + "crossSubdomainCookie": false, + "persistenceType": "cookie", + "persistenceName": "", + "secureCookie": false, + "useNewMapping": false, + "eventFilteringOption": "disable", + "userDefinedPageEventTemplate": "Page Loaded", + "useNativeSDK": { + "web": false + }, + "connectionMode": { + "web": "cloud" + }, + "oneTrustCookieCategories": { + "web": [ + { + "oneTrustCookieCategory": "" + } + ] + }, + "ketchConsentPurposes": { + "web": [ + { + "purpose": "" + } + ] + }, + "consentManagement": { + "web": [ + { + "provider": "oneTrust", + "consents": [] + }, + { + "provider": "ketch", + "consents": [] + } + ] + }, + "token": "some-fancy-token" + }, + "liveEventsConfig": {}, + "destinationDefinitionName": "MP", + "deleted": false, + "transformationIds": [], + "createdAt": "2024-06-20T19:13:55.060Z", + "updatedAt": "2024-10-01T09:28:13.436Z", + "revisionId": "2mpYH5Yaa2NbArGnp02Vq424jDb", + "secretVersion": 2 + }, + "2hQdZuO7jPzgcw1uN9NaU1u2wPg": { + "name": "Destination 2", + "enabled": true, + "config": { + "project": "a-very-cool-project", + "location": "US", + "bucketName": "rudderstack_bucket_name_1", + "prefix": "prod", + "namespace": "rudderstack_ns_prod", + "syncFrequency": "180", + "skipUsersTable": false, + "skipTracksTable": false, + "jsonPaths": "", + "connectionMode": { + "web": "cloud" + }, + "oneTrustCookieCategories": { + "web": [ + { + "oneTrustCookieCategory": "" + } + ] + }, + "underscoreDivideNumbers": true, + "allowUsersContextTraits": true, + "consentManagement": { + "web": [ + { + "provider": "oneTrust", + "consents": [] + } + ] + } + }, + "liveEventsConfig": {}, + "destinationDefinitionName": "BQ", + "deleted": false, + "transformationIds": [], + "createdAt": "2024-06-04T19:44:53.606Z", + "updatedAt": "2024-10-01T09:41:58.642Z", + "revisionId": "2mpZwj4fabdKCCOZe7BDMgTKKJq", + "secretVersion": 3 + } + }, + "connections": { + "2i9lmuu9TW8vT4YJ0rTTSMDtVJR": { + "sourceId": "2hCDcJJGtZCIMDWjAyjsOg0W1Hr", + "destinationId": "2i9lmpOsrP9KHqDR5hAlKWfNzb5", + "enabled": true, + "processorEnabled": true + }, + "2hQdZtIleMfPqvozvCYtGDdjDYa": { + "sourceId": "2hCDcJJGtZCIMDWjAyjsOg0W1Hr", + "destinationId": "2hQdZuO7jPzgcw1uN9NaU1u2wPg", + "enabled": true, + "processorEnabled": true + } + }, + "destinationTransformations": [], + "transformations": {}, + "libraries": [], + "whtProjects": {}, + "trackingPlans": {}, + "accounts": {}, + "credentials": {}, + "settings": { + "dataRetention": { + "disableReportingPii": false, + "useSelfStorage": false, + "retentionPeriod": "default", + "storagePreferences": { + "procErrors": true, + "gatewayDumps": true + } + }, + "eventAuditEnabled": false + }, + "resources": {}, + "eventReplays": {}, + "updatedAt": "2024-10-16T13:35:54.830Z" + }, + "2bVMV2JiAJe42OXZrzyvJI75v0N": { + "sources": { + "2jIY89R3Ul5PxVsynuYapfmgtfP": { + "name": "Source 2", + "writeKey": "some-write-key-for-source-2", + "enabled": true, + "deleted": false, + "config": { + "origin": "model", + "primaryKey": "uuid" + }, + "liveEventsConfig": { + "eventUpload": false, + "eventUploadTS": 1723214107805 + }, + "createdBy": "2be2oFX4OkMy3oKxJGLZQMTuqGA", + "transient": false, + "sourceDefinitionName": "bigquery", + "dgSourceTrackingPlanConfig": null, + "secretVersion": null, + "createdAt": "2024-07-15T20:38:15.071Z", + "updatedAt": "2024-08-09T14:35:07.805Z", + "geoEnrichment": { + "enabled": false + }, + "sqlModel": { + "id": "2jIY89vtlCllbBaQiew9UpxcnRl", + "accountId": "2dbIDPUfRgMcUevSImAmjSMjTZC" + } + } + }, + "destinations": { + "2nqWMUk1fNNAw4rU3bHYcm58CZZ": { + "name": "Destination 3", + "enabled": false, + "config": { + "accountId": "969696", + "isSPA": false, + "sendExperimentTrack": true, + "sendExperimentIdentify": false, + "eventFilteringOption": "blacklistedEvents", + "libraryTolerance": "2500", + "settingsTolerance": "2000", + "useExistingJquery": false, + "whitelistedEvents": [ + { + "eventName": "Page" + } + ], + "blacklistedEvents": [], + "useNativeSDK": { + "web": true + }, + "consentManagement": { + "web": [ + { + "provider": "oneTrust", + "resolutionStrategy": "", + "consents": [] + } + ] + }, + "connectionMode": { + "web": "device" + }, + "oneTrustCookieCategories": {}, + "ketchConsentPurposes": {} + }, + "liveEventsConfig": {}, + "destinationDefinitionName": "VWO", + "deleted": false, + "transformationIds": [], + "createdAt": "2024-10-23T16:30:38.111Z", + "updatedAt": "2024-11-27T20:13:30.126Z", + "revisionId": "2pRomv2tOn7OgjzUye2bIgPUKfP", + "secretVersion": 2 + } + }, + "connections": { + "2i9I6qe9ALBTu37KKLc5nN8PwZT": { + "sourceId": "2jIY89R3Ul5PxVsynuYapfmgtfP", + "destinationId": "2nqWMUk1fNNAw4rU3bHYcm58CZZ", + "enabled": true, + "processorEnabled": true + } + }, + "destinationTransformations": [ + { + "destinationId": "2nqWMUk1fNNAw4rU3bHYcm58CZZ", + "transformationId": "2iQSOW2V2u9XWNRyxbrcgPeNyVP", + "config": { + "enableForCloudMode": true, + "enableForDeviceMode": false, + "propagateEventsUntransformedOnErrorForDeviceMode": false, + "propagateEventsUntransformedOnErrorForCloudMode": false + } + } + ], + "transformations": { + "2iQSOW2V2u9XWNRyxbrcgPeNyVP": { + "versionId": "2iQgjbu7kNBAemcp373cJPVpWhf", + "liveEventsConfig": {} + } + }, + "libraries": [ + { + "versionId": "2hgzDvRrNREZMKm9emKSrWcnECv" + }, + { + "versionId": "2pLoAb3lPyHV50Kx9mwq6dfaLUU" + } + ], + "whtProjects": {}, + "trackingPlans": { + "tp_2o7altLHTkRsQMa5NKVzAbH2fQ6": { + "version": 2 + }, + "tp_2ntNQ2Uw7V6pa80Gi9eUgWWZEJz": { + "version": 4 + } + }, + "accounts": { + "2gViSzlt6hyqKmPsrdkFaC1gxke": { + "name": "Account name 1", + "role": "singer-facebook-marketing", + "options": {}, + "secret": { + "access_token": "some-access-token" + }, + "userId": "2fdoRuYbHt1HBFE7Yt4cHjdxpkZ", + "metadata": { + "userId": "10168584126310482", + "displayName": "Account name 1" + }, + "secretVersion": 4, + "rudderCategory": "source" + } + }, + "settings": { + "dataRetention": { + "disableReportingPii": false, + "useSelfStorage": false, + "retentionPeriod": "default", + "storagePreferences": { + "procErrors": true, + "gatewayDumps": true + } + }, + "eventAuditEnabled": true + }, + "updatedAt": "2024-11-27T20:13:30.647Z" + } + }, + "sourceDefinitions": { + "close_crm": { + "options": { + "isBeta": true + }, + "displayName": "Close CRM", + "category": "webhook", + "config": {}, + "createdAt": "2024-07-03T09:57:46.015Z", + "updatedAt": "2024-07-03T09:57:46.015Z", + "id": "2ijOkxneabV0DLzk0nsZUKz57ie", + "type": "cloud" + }, + "singer-klaviyo": { + "options": { + "image": "rudderstack/source-klaviyo:v7.1.1-alpha.ea2e0337e7", + "deprecated": true, + "deprecationLabel": "Klaviyo API v1 is deprecated and will be retired on June 30, 2024 . Please use Klaviyo v2 source instead." + }, + "displayName": "Klaviyo", + "category": "singer-protocol", + "config": null, + "createdAt": "2022-02-01T10:51:18.705Z", + "updatedAt": "2024-07-03T09:57:50.353Z", + "id": "24VPFifQ0rEgLgrUzlM71Xeq0uw", + "type": "cloudSource" + } + }, + "destinationDefinitions": { + "LINKEDIN_ADS": { + "displayName": "Linkedin Ads", + "config": { + "auth": { + "role": "linkedin_ads", + "type": "OAuth", + "rudderScopes": [ + "delivery" + ] + }, + "destConfig": { + "web": [ + "connectionMode", + "oneTrustCookieCategories", + "ketchConsentPurposes" + ] + }, + "secretKeys": [], + "cdkV2Enabled": true, + "transformAtV1": "router", + "supportedSourceTypes": [ + "web" + ], + "saveDestinationResponse": true, + "supportedConnectionModes": { + "web": [ + "cloud" + ] + }, + "supportedMessageTypes": [ + "track" + ] + }, + "responseRules": {}, + "category": null, + "options": { + "isBeta": true + }, + "createdAt": "2024-04-04T09:33:25.123Z", + "updatedAt": "2024-09-19T10:35:41.520Z", + "id": "2ed8gUO1eIa0Hx2HT38WxINskMI" + }, + "SFTP": { + "displayName": "SFTP", + "config": { + "destConfig": { + "warehouse": [ + "oneTrustCookieCategories", + "ketchConsentPurposes" + ], + "defaultConfig": [ + "host", + "port", + "authMethod", + "username", + "password", + "privateKey", + "fileFormat", + "filePath" + ] + }, + "secretKeys": [ + "password", + "privateKey" + ], + "transformAtV1": "none", + "syncBehaviours": [ + "mirror" + ], + "disableJsonMapper": true, + "supportedSourceTypes": [ + "warehouse" + ], + "supportsVisualMapper": true, + "saveDestinationResponse": true, + "supportedConnectionModes": { + "warehouse": [ + "cloud" + ] + }, + "supportedMessageTypes": [ + "record" + ] + }, + "responseRules": {}, + "category": null, + "options": { + "isBeta": true + }, + "createdAt": "2024-05-16T10:45:48.349Z", + "updatedAt": "2024-09-19T10:35:09.049Z", + "id": "2gXufoBROA1QPTbcDeM2SwQxow4" + } + } +} \ No newline at end of file