Skip to content

Commit

Permalink
switch on change event action
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
  • Loading branch information
amimimor committed Oct 26, 2023
1 parent 63a5b9a commit 163489b
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions pubsub/aws/snssqs/subscription_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func createQueueConsumerCbk(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk

func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo)) {
initOnce.Do(func() {
sm.logger.Debug("initializing subscription manager")
sm.logger.Debug("Initializing subscription manager")
queueConsumerCbk := createQueueConsumerCbk(queueInfo, dlqInfo, cbk)
go subscriptionMgmtInst.queueConsumerController(queueConsumerCbk)
sm.logger.Debug("subscription manager initialized")
sm.logger.Debug("Subscription manager initialized")
})
}

Expand All @@ -91,35 +91,35 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
select {
case changeEvent := <-sm.topicsChangeCh:
topic := changeEvent.handler.topic
sm.logger.Debugf("subscription change event received with action: %v, on topic: %s", changeEvent.action, topic)
sm.logger.Debugf("Subscription change event received with action: %v, on topic: %s", changeEvent.action, topic)
// topic change events are serialized so that no interleaving can occur
sm.lock.Lock()
// although we have a lock here, the topicsHandlers map is thread safe and can be accessed concurrently so other subscribers that are already consuming messages
// can get the handler for the topic while we're still updating the map without blocking them
current := sm.topicsHandlers.Size()

if changeEvent.action == Subscribe {
switch changeEvent.action {
case Subscribe:
sm.topicsHandlers.Store(topic, changeEvent.handler)

// if before we've added the subscription there were no subscriptions, this subscribe signals us to start consuming from sqs
if current == 0 {
var subCtx context.Context
// create a new context for sqs consumption with a cancel func to be used when we unsubscribe from all topics
subCtx, sm.consumeCancelFunc = context.WithCancel(ctx)
// start sqs consumption
sm.logger.Info("starting sqs consumption")
sm.logger.Info("Starting sqs consumption")
go queueConsumerCbk(subCtx)
}
} else if changeEvent.action == Unsubscribe {
case Unsubscribe:
sm.topicsHandlers.Delete(topic)
// for idempotency, we check the size of the map after the delete operation, as we might have already deleted the subscription
afterDelete := sm.topicsHandlers.Size()
// if before we've removed this subscription we had one (last) subscription, this signals us to stop sqs consumption
if current == 1 && afterDelete == 0 {
sm.logger.Info("last subscription removed. no more handlers are mapped to topics. stopping sqs consumption")
sm.logger.Info("Last subscription removed. no more handlers are mapped to topics. stopping sqs consumption")
sm.consumeCancelFunc()
}
}

sm.lock.Unlock()
case <-sm.closeCh:
return
Expand All @@ -128,7 +128,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
}

func (sm *SubscriptionManager) Subscribe(topicHandler *SubscriptionTopicHandler) {
sm.logger.Debug("subscribing to topic: ", topicHandler.topic)
sm.logger.Debug("Subscribing to topic: ", topicHandler.topic)

sm.wg.Add(1)
go func() {
Expand All @@ -138,7 +138,7 @@ func (sm *SubscriptionManager) Subscribe(topicHandler *SubscriptionTopicHandler)
}

func (sm *SubscriptionManager) createSubscribeListener(topicHandler *SubscriptionTopicHandler) {
sm.logger.Debug("creating a subscribe listener for topic: ", topicHandler.topic)
sm.logger.Debug("Creating a subscribe listener for topic: ", topicHandler.topic)

sm.topicsChangeCh <- changeSubscriptionTopicHandler{Subscribe, topicHandler}
closeCh := make(chan struct{})
Expand All @@ -152,7 +152,8 @@ func (sm *SubscriptionManager) createSubscribeListener(topicHandler *Subscriptio

// ctx is a context provided by daprd per subscription. unrelated to the consuming sm.baseCtx
func (sm *SubscriptionManager) createUnsubscribeListener(ctx context.Context, topic string, closeCh <-chan struct{}) {
sm.logger.Debug("creating an unsubscribe listener for topic: ", topic)
sm.logger.Debug("Creating an unsubscribe listener for topic: ", topic)

defer sm.unsubscribe(topic)
for {
select {
Expand All @@ -165,7 +166,7 @@ func (sm *SubscriptionManager) createUnsubscribeListener(ctx context.Context, to
}

func (sm *SubscriptionManager) unsubscribe(topic string) {
sm.logger.Debug("unsubscribing from topic: ", topic)
sm.logger.Debug("Unsubscribing from topic: ", topic)

if value, ok := sm.GetSubscriptionTopicHandler(topic); ok {
sm.topicsChangeCh <- changeSubscriptionTopicHandler{Unsubscribe, value}
Expand Down

0 comments on commit 163489b

Please sign in to comment.