Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix send on closed channel panic #140

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

echistyakov
Copy link
Contributor

Fix send-on-closed-channel panic in blockSubscriber

Motivation:

I caught this bug while stress-testing RSocket-based Client/Server implementation in Facebook Thrift: https://github.com/facebook/fbthrift/blob/main/thrift/lib/go/thrift/stress/server_test.go

==================
WARNING: DATA RACE
Read at 0x00c0042aa1d0 by goroutine 3166913:
  runtime.chansend()
      third-party/go/1.23.4/linux_amd64/src/runtime/chan.go:171 +0x0
  github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnError()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:42 +0xc9
  github.com/rsocket/rsocket-go/rx/mono.(*blockSubscriber).OnError()
      <autogenerated>:1 +0x66
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnError()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:195 +0x6b
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnSubscribe()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:227 +0xdd
  github.com/jjeffcaii/reactor-go/mono.(*processor).SubscribeWith.func1()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:119 +0x64
  github.com/panjf2000/ants/v2.(*goWorker).run.func1()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:67 +0x130

Previous write at 0x00c0042aa1d0 by goroutine 3166370:
  runtime.closechan()
      third-party/go/1.23.4/linux_amd64/src/runtime/chan.go:397 +0x0
  github.com/rsocket/rsocket-go/rx/mono.toBlock.deferwrap2()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:173 +0x33
  runtime.deferreturn()
      third-party/go/1.23.4/linux_amd64/src/runtime/panic.go:605 +0x5d
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).BlockUnsafe()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:114 +0x5a
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).Block()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:126 +0x78
  thrift/lib/go/thrift.(*rsocketClient).RequestResponse()
      fbcode/thrift/lib/go/thrift/rocket_rsocket_client.go:148 +0x15d
  thrift/lib/go/thrift.(*rocketClient).Flush()
      fbcode/thrift/lib/go/thrift/rocket_client.go:123 +0x436
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).sendMsg()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:61 +0x2f0
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Call()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:118 +0x14f
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:87 +0x114
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:95 +0x6c
  thrift/lib/go/thrift/stress.runStressTest.func4()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:127 +0x2cd
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:78 +0xa1

Goroutine 3166913 (running) created at:
  github.com/panjf2000/ants/v2.(*goWorker).run()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:48 +0xc4
  github.com/panjf2000/ants/v2.(*Pool).retrieveWorker()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:348 +0x384
  github.com/panjf2000/ants/v2.(*Pool).Submit()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:222 +0x67
  github.com/jjeffcaii/reactor-go/scheduler.(*elasticScheduler).Do()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/scheduler/elastic.go:34 +0x56
  github.com/jjeffcaii/reactor-go/mono.(*processor).SubscribeWith()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:118 +0x232
  github.com/jjeffcaii/reactor-go/mono.(*wrapper).SubscribeWith()
      <autogenerated>:1 +0x72
  github.com/rsocket/rsocket-go/rx/mono.toBlock()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:169 +0x141
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).BlockUnsafe()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:114 +0x5a
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).Block()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:126 +0x78
  thrift/lib/go/thrift.(*rsocketClient).RequestResponse()
      fbcode/thrift/lib/go/thrift/rocket_rsocket_client.go:148 +0x15d
  thrift/lib/go/thrift.(*rocketClient).Flush()
      fbcode/thrift/lib/go/thrift/rocket_client.go:123 +0x436
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).sendMsg()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:61 +0x2f0
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Call()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:118 +0x14f
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:87 +0x114
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:95 +0x6c
  thrift/lib/go/thrift/stress.runStressTest.func4()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:127 +0x2cd
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:78 +0xa1

Goroutine 3166370 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:75 +0x15c
  thrift/lib/go/thrift/stress.runStressTest()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:151 +0x77e
  thrift/lib/go/thrift/stress.TestServerStress.func1()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:45 +0x2b
  testing.tRunner()
      third-party/go/1.23.4/linux_amd64/src/testing/testing.go:1690 +0x226
  testing.(*T).Run.gowrap1()
      third-party/go/1.23.4/linux_amd64/src/testing/testing.go:1743 +0x44
==================
2025/01/13 11:47:01.150080 [ants]: worker exits from panic: send on closed channel
goroutine 3161627 [running]:
runtime/debug.Stack()
        third-party/go/1.23.4/linux_amd64/src/runtime/debug/stack.go:26 +0x67
github.com/panjf2000/ants/v2.(*goWorker).run.func1.1()
        fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:56 +0x15d
panic({0x2583a0?, 0x352ee0?})
        third-party/go/1.23.4/linux_amd64/src/runtime/panic.go:785 +0x132
github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnError({0xc0034fe070?, 0xc0042aa150?, 0xc0042aa1c0?}, {0x3553e0, 0xc002786060})
        fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:42 +0xca
github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnError(0xc002e5e040, {0x3553e0, 0xc002786060})
        fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:195 +0x6c
github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnSubscribe(0xc002e5e040, {0x357a68, 0xc004b00700}, {0x356f00, 0xc002e5e040})
        fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:227 +0xde
github.com/jjeffcaii/reactor-go/mono.(*processor).SubscribeWith.func1()
        fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:119 +0x65
github.com/panjf2000/ants/v2.(*goWorker).run.func1()
        fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:67 +0x131
created by github.com/panjf2000/ants/v2.(*goWorker).run in goroutine 3166632
        fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:48 +0xc5

panic: send on closed channel

goroutine 3166696 [running]:
github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnError({0xc0019be230?, 0xc003f06070?, 0xc003f060e0?}, {0x354340, 0xad0570})
        fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:42 +0xca
github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnSubscribe.func1()
        fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:64 +0x12d
created by github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnSubscribe in goroutine 3165248
        fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:61 +0x1cd

This bug is caused by a potential race condition on the following lines:

if common.SafeCloseDoneChan(b.done) {
b.echan <- err
}

  • On the above lines - done channel gets closed, and echan gets sent on.
  • The receiver will get awaken, by the done channel closing. However, it may run to completion before echan is sent on. If that happens - the receiver will close echan and the sender (blockSubscriber) will send on a closed channel - panic!

rsocket-go/rx/mono/utils.go

Lines 170 to 182 in 099cb5b

<-done
defer close(vchan)
defer close(echan)
select {
case value := <-vchan:
return value, nil
case err := <-echan:
return nil, err
default:
return nil, nil
}

Modifications:

  • Make blockSubscriber the explicit owner of all the channels (done/vchan/echan).
  • Ensure that done channel can be closed only once - by protecting it with an atomic.Bool that gets swapped to true after done is closed.
  • vchan/echan don't actually need to be closed (there is no such requirement in Go - the channels will just be garbage collected).

Result:

With this change - I was not able to reproduce this failure/panic anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant