Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1051 ] Close the channel before sending data if the consumer has been shutdown. #1052

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

cserwen
Copy link
Member

@cserwen cserwen commented May 9, 2023

What is the purpose of the change

Fix #1051

Brief changelog

XX

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@cserwen cserwen requested a review from ShannonDing May 9, 2023 07:55
@0daypwn
Copy link
Contributor

0daypwn commented Sep 28, 2023

dc.processQueueTable.Range(func(key, value interface{}) bool {
k := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pq.WithDropped(true)
// close msg channel using RWMutex to make sure no data was writing
pq.mutex.Lock()
close(pq.msgCh)
pq.mutex.Unlock()
mqs = append(mqs, &k)
return true
})

谁写入channel,谁负责关闭 是个好习惯

在已经有 close(pq.closeChan) 之后,再 close(pq.msgCh) 这个操作完全是多余的,关闭还会导致上述写入的风险。
在 close(pq.closeChan) 之后,只要不 close(pq.msgCh), pq.putMessage 和 pq.getMessage 即使select到 msgCh 也不会有影响。
pq.putMessage 和 pq.getMessage 是msgCh唯二的使用者,之后是一定可以return的,也不存在会阻塞/泄露之类的问题。
因此我认为只去掉 close(pq.msgCh) 这部分即可 @cserwen

另外 consumer/consumer.go 这部分是基类
修改需要考虑
consumer/push_consumer.go
consuemr/pull_consumer.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Close the channel before sending data if the consumer has been shutdown.
2 participants