Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: incremental updates with updatedAfter #71

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
101e550
chore: initial clean-up
fracasula Dec 2, 2024
8467ada
feat: updatedAfter
fracasula Dec 3, 2024
498cab1
chore: updating golangci-lint
fracasula Dec 3, 2024
efb7574
test: incremental updates
fracasula Dec 3, 2024
60f3f25
feat: poller backoff
fracasula Dec 3, 2024
c592c9a
fix: no poller
fracasula Dec 3, 2024
617a496
chore: adding TODO
fracasula Dec 3, 2024
e5bcdee
chore: adding TODO + sample app changes
fracasula Dec 4, 2024
4fc410a
feat: diff support
fracasula Dec 4, 2024
f156284
chore: updating artifacts
fracasula Dec 4, 2024
91a7aed
chore: grouping TODOs
fracasula Dec 4, 2024
53e8a10
feat: polling on-response callback
fracasula Dec 5, 2024
425666d
chore: refactoring
fracasula Dec 5, 2024
06fc7fa
chore: better example with mutex
fracasula Dec 5, 2024
0223eed
chore: adding comment
fracasula Dec 5, 2024
aaac34a
test: workspace configs
fracasula Dec 5, 2024
69e15a3
test: poller
fracasula Dec 5, 2024
00dd89f
test: sdk
fracasula Dec 5, 2024
94967d8
fix: lint
fracasula Dec 5, 2024
9ad4bf6
chore: update cache only when needed
fracasula Dec 6, 2024
72b174f
chore: simplifying approach (#72)
fracasula Dec 10, 2024
5f18774
chore: fix template
fracasula Dec 10, 2024
32a2399
fix: non updateables
fracasula Dec 10, 2024
2ae67b0
chore: adding TODO
fracasula Dec 10, 2024
b355760
chore: poller no delay 1st time
fracasula Dec 10, 2024
889d1c7
chore: backoff options
fracasula Dec 10, 2024
33baeb6
chore: removing unnecessary reference
fracasula Dec 10, 2024
19409f7
test: non updateables
fracasula Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## Linear Ticket

< Replace_with_Linear_Link >
< Linear_Link >

## Security

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '~1.21'
go-version: '~1.23'
check-latest: true
- run: go version
- run: go mod download
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/setup-go@v5
with:
check-latest: true
go-version: '~1.21'
go-version: '~1.23'
- run: go version

- run: go mod tidy
Expand All @@ -38,10 +38,10 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '~1.21'
go-version: '~1.23'
check-latest: true
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.55.2
version: v1.62.2
args: -v
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
run:
timeout: 7m
go: '1.21'
go: '1.23'

linters:
enable:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ install-tools:
go install github.com/golang/mock/[email protected]
go install mvdan.cc/gofumpt@latest
go install golang.org/x/tools/cmd/goimports@latest
bash ./scripts/install-golangci-lint.sh v1.55.0
bash ./scripts/install-golangci-lint.sh v1.62.2

.PHONY: lint
lint: fmt ## Run linters on all go files
Expand Down
34 changes: 5 additions & 29 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package cpsdk
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

// BenchmarkGetWorkspaceConfigs on free-us-1, last time it showed 371MB of allocations with this technique
func BenchmarkGetWorkspaceConfigs(b *testing.B) {
conf := config.New()
baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com")
baseURL := conf.GetString("BASE_URL", "https://dp.api.rudderstack.com/")
namespace := conf.GetString("NAMESPACE", "free-us-1")
identity := conf.GetString("IDENTITY", "")
if identity == "" {
Expand All @@ -24,39 +24,15 @@ func BenchmarkGetWorkspaceConfigs(b *testing.B) {
cpSDK, err := New(
WithBaseUrl(baseURL),
WithLogger(logger.NOP),
WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller
WithNamespaceIdentity(namespace, identity),
)
require.NoError(b, err)
defer cpSDK.Close()

_, err = cpSDK.GetWorkspaceConfigs(context.Background())
require.NoError(b, err)
}

// BenchmarkGetCustomWorkspaceConfigs on free-us-1, last time it showed 88MB of allocations with this technique
func BenchmarkGetCustomWorkspaceConfigs(b *testing.B) {
conf := config.New()
baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com")
namespace := conf.GetString("NAMESPACE", "free-us-1")
identity := conf.GetString("IDENTITY", "")
if identity == "" {
b.Skip("IDENTITY is not set")
return
}

cpSDK, err := New(
WithBaseUrl(baseURL),
WithLogger(logger.NOP),
WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller
WithNamespaceIdentity(namespace, identity),
)
require.NoError(b, err)
defer cpSDK.Close()

var workspaceConfigs WorkspaceConfigs
err = cpSDK.GetCustomWorkspaceConfigs(context.Background(), &workspaceConfigs)
err = cpSDK.GetWorkspaceConfigs(context.Background(), &workspaceConfigs, time.Time{})
require.NoError(b, err)
require.NotNil(b, workspaceConfigs)
require.Greater(b, len(workspaceConfigs.Workspaces), 0)
}

type WorkspaceConfigs struct {
Expand Down
106 changes: 92 additions & 14 deletions cmd/sampleapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,36 @@ import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"time"

cpsdk "github.com/rudderlabs/rudder-cp-sdk"
"github.com/rudderlabs/rudder-cp-sdk/diff"
"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/poller"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
)

var log logger.Logger

func main() {
// setup a logger using the rudder-go-kit package
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

c := config.New()
loggerFactory := logger.NewFactory(c)
log = loggerFactory.NewLogger()
log := loggerFactory.NewLogger()

if err := run(); err != nil {
log.Fatalf("main error: %v", err)
if err := run(ctx, log); err != nil {
log.Fataln("main error", obskit.Error(err))
os.Exit(1)
}
}

// setupControlPlaneSDK creates a new SDK instance using the environment variables
func setupControlPlaneSDK() (*cpsdk.ControlPlane, error) {
func setupControlPlaneSDK(log logger.Logger) (*cpsdk.ControlPlane, error) {
apiUrl := os.Getenv("CPSDK_API_URL")
workspaceToken := os.Getenv("CPSDK_WORKSPACE_TOKEN")
namespace := os.Getenv("CPSDK_NAMESPACE")
Expand Down Expand Up @@ -59,18 +67,88 @@ func setupControlPlaneSDK() (*cpsdk.ControlPlane, error) {
return cpsdk.New(options...)
}

// run is the main function that uses the SDK
func run() error {
sdk, err := setupControlPlaneSDK()
func setupWorkspaceConfigsPoller[K comparable](
getter poller.WorkspaceConfigsGetter[K],
handler poller.WorkspaceConfigsHandler[K],
constructor func() diff.UpdateableObject[K],
log logger.Logger,
) (*poller.WorkspaceConfigsPoller[K], error) {
return poller.NewWorkspaceConfigsPoller(getter, handler, constructor,
poller.WithLogger[K](log.Child("poller")),
poller.WithPollingInterval[K](1*time.Second),
poller.WithPollingBackoffInitialInterval[K](1*time.Second),
poller.WithPollingBackoffMaxInterval[K](1*time.Minute),
poller.WithPollingMaxElapsedTime[K](5*time.Minute),
poller.WithPollingMaxRetries[K](15),
poller.WithPollingBackoffMultiplier[K](1.5),
poller.WithOnResponse[K](func(updated bool, err error) {
if err != nil {
// e.g. bump metric on failure
log.Errorn("failed to poll workspace configs", obskit.Error(err))
} else {
// e.g. bump metric on success
log.Debugn("successfully polled workspace configs", logger.NewBoolField("updated", updated))
}
}),
)
}

func setupClientWithPoller(
cache diff.UpdateableObject[string],
cacheMu *sync.RWMutex,
log logger.Logger,
) (func(context.Context), error) {
sdk, err := setupControlPlaneSDK(log)
if err != nil {
return fmt.Errorf("error setting up control plane sdk: %v", err)
return nil, fmt.Errorf("error setting up control plane sdk: %v", err)
}
defer sdk.Close()

_, err = sdk.Client.GetWorkspaceConfigs(context.Background())
updater := diff.Updater[string]{}

p, err := setupWorkspaceConfigsPoller[string](
func(ctx context.Context, l diff.UpdateableObject[string], updatedAfter time.Time) error {
return sdk.GetWorkspaceConfigs(ctx, l, updatedAfter)
},
func(obj diff.UpdateableObject[string]) (time.Time, bool, error) {
cacheMu.Lock()
defer cacheMu.Unlock()
return updater.UpdateCache(obj, cache)
},
func() diff.UpdateableObject[string] {
return &modelv2.WorkspaceConfigs{}
},
log,
)
if err != nil {
return fmt.Errorf("error getting workspace configs: %v", err)
return nil, fmt.Errorf("error setting up poller: %v", err)
}

return p.Run, nil
}

// run is the main function that uses the SDK
func run(ctx context.Context, log logger.Logger) error {
var (
// WARNING: if you don't want to use modelv2.WorkspaceConfigs because you're interested in a smaller subset of
// the data, then have a look at diff_test.go for an example of how to implement a custom UpdateableList.
cache = &modelv2.WorkspaceConfigs{}
cacheMu = &sync.RWMutex{}
)

poll, err := setupClientWithPoller(cache, cacheMu, log)
if err != nil {
return fmt.Errorf("error setting up client with poller: %v", err)
}

pollingDone := make(chan struct{})
go func() {
poll(ctx) // blocking call, runs until context is cancelled
close(pollingDone)
}()

cacheMu.RLock()
// do something with "cache" here
cacheMu.RUnlock() // nolint:staticcheck

return nil
}
133 changes: 133 additions & 0 deletions diff/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package diff

import (
"fmt"
"iter"
"time"
)

type UpdateableObject[K comparable] interface {
Updateables() iter.Seq[UpdateableList[K, UpdateableElement]]
NonUpdateables() iter.Seq[NonUpdateablesList[K, any]]
}

type UpdateableElement interface {
GetUpdatedAt() time.Time
IsNil() bool
}

type UpdateableList[K comparable, T UpdateableElement] interface {
Type() string
Length() int
Reset()
List() iter.Seq2[K, T]
GetElementByKey(id K) (T, bool)
SetElementByKey(id K, object T)
}

type NonUpdateablesList[K comparable, T any] interface {
Type() string
Reset()
SetElementByKey(id K, object T)
List() iter.Seq2[K, T]
}

type Updater[K comparable] struct {
latestUpdatedAt time.Time
}

func (u *Updater[K]) UpdateCache(new, cache UpdateableObject[K]) (time.Time, bool, error) {
var (
atLeastOneUpdate bool
latestUpdatedAt time.Time
)

for n := range new.Updateables() {
var (
found, updated bool
c UpdateableList[K, UpdateableElement]
)
for c = range cache.Updateables() {
if n.Type() == c.Type() {
found = true
break
}
}
if !found {
return time.Time{}, false, fmt.Errorf(`cannot find updateable list of type %q in cache`, n.Type())
}

for k, v := range n.List() {
if v.IsNil() {
cachedValue, ok := c.GetElementByKey(k)
if !ok {
return time.Time{}, false, fmt.Errorf(`value "%v" in %q was not updated but was not present in cache`, k, n.Type())
}

if cachedValue.IsNil() {
return time.Time{}, false, fmt.Errorf(`value "%v" in %q was not updated but was nil in cache`, k, n.Type())
}

n.SetElementByKey(k, cachedValue)

continue
}

updated = true // at least one value in "new" was not null, thus it was updated

if v.GetUpdatedAt().After(latestUpdatedAt) {
latestUpdatedAt = v.GetUpdatedAt()
}
}

if updated {
atLeastOneUpdate = true

c.Reset()
// we need to iterate over the cache too because we can't simply do "cache = new", since it's an interface
// it won't persist after the function ends.
for k, v := range n.List() {
c.SetElementByKey(k, v)
}
}
}

if err := u.replaceNonUpdateables(new, cache); err != nil {
return time.Time{}, false, err
}

// only update updatedAt if we managed to handle the response
// so that we don't miss any updates in case of an error
if !latestUpdatedAt.IsZero() {
// There is a case where all workspaces have not been updated since the last request.
// In that case updatedAt will be zero.
u.latestUpdatedAt = latestUpdatedAt
}

return u.latestUpdatedAt, atLeastOneUpdate, nil
}

func (u *Updater[K]) replaceNonUpdateables(new, cache UpdateableObject[K]) error {
for n := range new.NonUpdateables() {
var (
found bool
c NonUpdateablesList[K, any]
)
for c = range cache.NonUpdateables() {
if n.Type() == c.Type() {
found = true
break
}
}
if !found {
return fmt.Errorf(`cannot find non updateable list of type %q in cache`, n.Type())
}

c.Reset()
for k, v := range n.List() {
c.SetElementByKey(k, v)
}
}

return nil
}
Loading
Loading