From 4adc6cf17101e5dadb5c6aecf125c0a07e88581d Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 31 Jan 2024 19:59:23 +0530 Subject: [PATCH] chore: remove subscriber (#16) --- internal/cache/cache.go | 11 +++++------ internal/cache/cache_test.go | 2 +- internal/cache/subscriber.go | 17 ----------------- sdk.go | 2 +- 4 files changed, 7 insertions(+), 25 deletions(-) delete mode 100644 internal/cache/subscriber.go diff --git a/internal/cache/cache.go b/internal/cache/cache.go index a1214e9..f181e8a 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -10,7 +10,7 @@ import ( type WorkspaceConfigCache struct { configs *modelv2.WorkspaceConfigs updateLock sync.Mutex - subscribers []*Subscriber + subscribers []chan notifications.WorkspaceConfigNotification subscriberLock sync.Mutex } @@ -44,7 +44,7 @@ func (c *WorkspaceConfigCache) Set(configs *modelv2.WorkspaceConfigs) { defer c.subscriberLock.Unlock() for _, subscriber := range c.subscribers { - subscriber.notify(notifications.WorkspaceConfigNotification{}) + subscriber <- notifications.WorkspaceConfigNotification{} } } @@ -86,13 +86,12 @@ func (c *WorkspaceConfigCache) merge(configs *modelv2.WorkspaceConfigs) { // Subscribers are notified in the order they are subscribed. // They can monitor for updates by reading from the notifications channel, provided by the Notifications function. // It is expected to handle any notifications in a timely manner, otherwise it will block the cache from updating. -func (c *WorkspaceConfigCache) Subscribe() *Subscriber { +func (c *WorkspaceConfigCache) Subscribe() chan notifications.WorkspaceConfigNotification { c.subscriberLock.Lock() defer c.subscriberLock.Unlock() - subscriber := &Subscriber{ - notifications: make(chan notifications.WorkspaceConfigNotification), - } + subscriber := make(chan notifications.WorkspaceConfigNotification) + c.subscribers = append(c.subscribers, subscriber) return subscriber diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index ae03a49..71801a1 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -30,7 +30,7 @@ func TestCacheSubscriptions(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) go func() { - for range s.Notifications() { + for range s { configs = append(configs, c.Get()) wg.Done() if len(configs) == 2 { diff --git a/internal/cache/subscriber.go b/internal/cache/subscriber.go deleted file mode 100644 index 1f7e85e..0000000 --- a/internal/cache/subscriber.go +++ /dev/null @@ -1,17 +0,0 @@ -package cache - -import "github.com/rudderlabs/rudder-control-plane-sdk/notifications" - -// Subscriber is a subscriber to workspace config updates. -type Subscriber struct { - notifications chan notifications.WorkspaceConfigNotification -} - -func (s *Subscriber) notify(n notifications.WorkspaceConfigNotification) { - s.notifications <- n -} - -// Notifications returns a channel that will be notified of any updates to the workspace configs. -func (s *Subscriber) Notifications() chan notifications.WorkspaceConfigNotification { - return s.notifications -} diff --git a/sdk.go b/sdk.go index 18311cd..19eddf4 100644 --- a/sdk.go +++ b/sdk.go @@ -158,6 +158,6 @@ type Subscriber interface { Notifications() chan notifications.WorkspaceConfigNotification } -func (cp *ControlPlane) Subscribe() *cache.Subscriber { +func (cp *ControlPlane) Subscribe() chan notifications.WorkspaceConfigNotification { return cp.configsCache.Subscribe() }