Skip to content

Commit

Permalink
chore: source pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jan 31, 2024
2 parents ecea43e + b63a268 commit 6bf7dcf
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 34 deletions.
16 changes: 8 additions & 8 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (

"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/notifications"
"github.com/rudderlabs/rudder-cp-sdk/subscriber"
)

type WorkspaceConfigCache struct {
configs *modelv2.WorkspaceConfigs
updateLock sync.Mutex
subscribers []*subscriber.Subscriber
subscribers []chan notifications.WorkspaceConfigNotification
subscriberLock sync.Mutex
}

Expand Down Expand Up @@ -44,8 +43,8 @@ func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

for _, s := range c.subscribers {
s.Notify(notifications.WorkspaceConfigNotification{})
for _, subscriber := range c.subscribers {
subscriber <- notifications.WorkspaceConfigNotification{}
}
}

Expand Down Expand Up @@ -87,14 +86,15 @@ func (c *WorkspaceConfigCache) merge(configs *modelv2.WorkspaceConfigs) {
// Subscribers are notified in the order they are subscribed.
// They can monitor for updates by reading from the notifications channel, provided by the Notifications function.
// It is expected to handle any notifications in a timely manner, otherwise it will block the cache from updating.
func (c *WorkspaceConfigCache) Subscribe() *subscriber.Subscriber {
func (c *WorkspaceConfigCache) Subscribe() chan notifications.WorkspaceConfigNotification {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

s := subscriber.New()
c.subscribers = append(c.subscribers, s)
subscriber := make(chan notifications.WorkspaceConfigNotification)

return s
c.subscribers = append(c.subscribers, subscriber)

return subscriber
}

func copyConfigs(c *modelv2.WorkspaceConfigs) *modelv2.WorkspaceConfigs {
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestCacheSubscriptions(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for range s.Notifications() {
for range s {
configs = append(configs, c.Get())
wg.Done()
if len(configs) == 2 {
Expand Down
3 changes: 1 addition & 2 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/rudderlabs/rudder-cp-sdk/internal/poller"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/notifications"
"github.com/rudderlabs/rudder-cp-sdk/subscriber"
"github.com/rudderlabs/rudder-go-kit/logger"
)

Expand Down Expand Up @@ -159,6 +158,6 @@ type Subscriber interface {
Notifications() chan notifications.WorkspaceConfigNotification
}

func (cp *ControlPlane) Subscribe() *subscriber.Subscriber {
func (cp *ControlPlane) Subscribe() chan notifications.WorkspaceConfigNotification {
return cp.configsCache.Subscribe()
}
23 changes: 0 additions & 23 deletions subscriber/subscriber.go

This file was deleted.

0 comments on commit 6bf7dcf

Please sign in to comment.