diff --git a/internal/cache/cache.go b/internal/cache/cache.go index d85595d..6daf712 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -5,13 +5,12 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/modelv2" "github.com/rudderlabs/rudder-cp-sdk/notifications" - "github.com/rudderlabs/rudder-cp-sdk/subscriber" ) type WorkspaceConfigCache struct { configs *modelv2.WorkspaceConfigs updateLock sync.Mutex - subscribers []*subscriber.Subscriber + subscribers []chan notifications.WorkspaceConfigNotification subscriberLock sync.Mutex } @@ -44,8 +43,8 @@ func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) { c.subscriberLock.Lock() defer c.subscriberLock.Unlock() - for _, s := range c.subscribers { - s.Notify(notifications.WorkspaceConfigNotification{}) + for _, subscriber := range c.subscribers { + subscriber <- notifications.WorkspaceConfigNotification{} } } @@ -87,14 +86,15 @@ func (c *WorkspaceConfigCache) merge(configs *modelv2.WorkspaceConfigs) { // Subscribers are notified in the order they are subscribed. // They can monitor for updates by reading from the notifications channel, provided by the Notifications function. // It is expected to handle any notifications in a timely manner, otherwise it will block the cache from updating. -func (c *WorkspaceConfigCache) Subscribe() *subscriber.Subscriber { +func (c *WorkspaceConfigCache) Subscribe() chan notifications.WorkspaceConfigNotification { c.subscriberLock.Lock() defer c.subscriberLock.Unlock() - s := subscriber.New() - c.subscribers = append(c.subscribers, s) + subscriber := make(chan notifications.WorkspaceConfigNotification) - return s + c.subscribers = append(c.subscribers, subscriber) + + return subscriber } func copyConfigs(c *modelv2.WorkspaceConfigs) *modelv2.WorkspaceConfigs { diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index 34747a8..848f80b 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -30,7 +30,7 @@ func TestCacheSubscriptions(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) go func() { - for range s.Notifications() { + for range s { configs = append(configs, c.Get()) wg.Done() if len(configs) == 2 { diff --git a/sdk.go b/sdk.go index e2a2e39..c1cdd1b 100644 --- a/sdk.go +++ b/sdk.go @@ -16,7 +16,6 @@ import ( "github.com/rudderlabs/rudder-cp-sdk/internal/poller" "github.com/rudderlabs/rudder-cp-sdk/modelv2" "github.com/rudderlabs/rudder-cp-sdk/notifications" - "github.com/rudderlabs/rudder-cp-sdk/subscriber" "github.com/rudderlabs/rudder-go-kit/logger" ) @@ -159,6 +158,6 @@ type Subscriber interface { Notifications() chan notifications.WorkspaceConfigNotification } -func (cp *ControlPlane) Subscribe() *subscriber.Subscriber { +func (cp *ControlPlane) Subscribe() chan notifications.WorkspaceConfigNotification { return cp.configsCache.Subscribe() } diff --git a/subscriber/subscriber.go b/subscriber/subscriber.go deleted file mode 100644 index 4600df6..0000000 --- a/subscriber/subscriber.go +++ /dev/null @@ -1,23 +0,0 @@ -package subscriber - -import "github.com/rudderlabs/rudder-cp-sdk/notifications" - -// Subscriber is a subscriber to workspace config updates. -type Subscriber struct { - notifications chan notifications.WorkspaceConfigNotification -} - -func New() *Subscriber { - return &Subscriber{ - notifications: make(chan notifications.WorkspaceConfigNotification), - } -} - -func (s *Subscriber) Notify(n notifications.WorkspaceConfigNotification) { - s.notifications <- n -} - -// Notifications returns a channel that will be notified of any updates to the workspace configs. -func (s *Subscriber) Notifications() chan notifications.WorkspaceConfigNotification { - return s.notifications -}