diff --git a/pubsub/aws/snssqs/subscription_mgmt.go b/pubsub/aws/snssqs/subscription_mgmt.go index 843147e441..c868d9dccc 100644 --- a/pubsub/aws/snssqs/subscription_mgmt.go +++ b/pubsub/aws/snssqs/subscription_mgmt.go @@ -19,8 +19,6 @@ const ( Unsubscribe ) -var initOnce sync.Once - type SubscriptionTopicHandler struct { topic string requestTopic string @@ -41,6 +39,7 @@ type SubscriptionManager struct { topicsHandlers *xsync.MapOf[string, *SubscriptionTopicHandler] lock sync.Mutex wg sync.WaitGroup + initOnce sync.Once } type SubscriptionManagement interface { @@ -67,7 +66,7 @@ func createQueueConsumerCbk(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk } func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo)) { - initOnce.Do(func() { + sm.initOnce.Do(func() { queueConsumerCbk := createQueueConsumerCbk(queueInfo, dlqInfo, cbk) go sm.queueConsumerController(queueConsumerCbk) sm.logger.Debug("Subscription manager initialized")