From 55e2e34b42aa84a9ae51154186ad2c8c85e73a83 Mon Sep 17 00:00:00 2001 From: dengzhiwen1 Date: Mon, 8 May 2023 21:18:58 +0800 Subject: [PATCH] fix: close the channel if the consumer has been shutdown before writing data --- consumer/consumer.go | 4 ---- consumer/process_queue.go | 7 ++++--- consumer/push_consumer.go | 1 + 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 7bd2b027..60228a25 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -297,10 +297,6 @@ func (dc *defaultConsumer) shutdown() error { k := key.(primitive.MessageQueue) pq := value.(*processQueue) pq.WithDropped(true) - // close msg channel using RWMutex to make sure no data was writing - pq.mutex.Lock() - close(pq.msgCh) - pq.mutex.Unlock() mqs = append(mqs, &k) return true }) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index aeb66d82..a92298ff 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -125,7 +125,8 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) { select { case <-pq.closeChan: return - case pq.msgCh <- messages: + default: + pq.msgCh <- messages } } @@ -293,8 +294,8 @@ func (pq *processQueue) getMessages() []*primitive.MessageExt { select { case <-pq.closeChan: return nil - case mq := <-pq.msgCh: - return mq + default: + return <-pq.msgCh } } diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 85f9725b..05b2748e 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -643,6 +643,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { NEXT: select { case <-pc.done: + close(pq.msgCh) rlog.Info("push consumer close message handle.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, })