From 5ce76b8766602b8a1e383fd48f1dcd692d1b18de Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 18 Jan 2023 22:51:12 -0500 Subject: [PATCH] Fix #375 (I think) This addresses #375; the now-deleted comment about the time gap being OK has been incorrect at least since we started handling `receiverAnswer`s specially; the test that was failing was one where a reference to an answer's promise filed (and transitively pcall) is handed to a method implementation. This patch avoids having a gap where pcall & promise are nil by: - Changing setPipelineCaller so it can be called before we release the lock - Introducing a stand-in type for the pipeline caller that we can set pcall to while the RecvCall is ongoing. Before fixing the bug it took many thousands or tens of thousands of test runs to trigger it, so while I have not been able to reproduce the issue after tens of thousands of runs, that alone is inconclusive -- but I *think* this fixes the bug. It definitely fixes something. --- rpc/answer.go | 17 +++++++------- rpc/rpc.go | 62 +++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 61 insertions(+), 18 deletions(-) diff --git a/rpc/answer.go b/rpc/answer.go index ce9c61a7..6d798e2b 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -8,7 +8,6 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/rc" - "capnproto.org/go/capnp/v3/internal/syncutil" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" ) @@ -129,16 +128,16 @@ func (c *Conn) newReturn() (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ er } // setPipelineCaller sets ans.pcall to pcall if the answer has not -// already returned. The caller MUST NOT hold ans.c.lk. +// already returned. The caller MUST hold ans.c.lk. // // This also sets ans.promise to a new promise, wrapping pcall. -func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) { - syncutil.With(&ans.c.lk, func() { - if !ans.flags.Contains(resultsReady) { - ans.pcall = pcall - ans.promise = capnp.NewPromise(m, pcall) - } - }) +func (ans *answer) setPipelineCaller(c *lockedConn, m capnp.Method, pcall capnp.PipelineCaller) { + c.assertIs(ans.c) + + if !ans.flags.Contains(resultsReady) { + ans.pcall = pcall + ans.promise = capnp.NewPromise(m, pcall) + } } // AllocResults allocates the results struct. diff --git a/rpc/rpc.go b/rpc/rpc.go index 99cabdeb..6751f443 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -846,12 +846,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := ent.client.RecvCall(callCtx, recv) - // Place PipelineCaller into answer. Since the receive goroutine is - // the only one that uses answer.pcall, it's fine that there's a - // time gap for this being set. - ans.setPipelineCaller(p.method, pcall) + pcall.resolve(ent.client.RecvCall(callCtx, recv)) }) return nil case rpccp.MessageTarget_Which_promisedAnswer: @@ -908,9 +906,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := tgt.RecvCall(callCtx, recv) - ans.setPipelineCaller(p.method, pcall) + pcall.resolve(tgt.RecvCall(callCtx, recv)) }) } else { // Results not ready, use pipeline caller. @@ -919,10 +918,11 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn callCtx, ans.cancel = context.WithCancel(c.bgctx) tgt := tgtAns.pcall c.tasks.Add(1) // will be finished by answer.Return + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv) + pcall.resolve(tgt.PipelineRecv(callCtx, p.target.transform, recv)) tgtAns.pcalls.Done() - ans.setPipelineCaller(p.method, pcall) }) } return nil @@ -932,6 +932,50 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn }) } +// A promisedPipelineCaller is a PipelineCaller that stands in for another +// PipelineCaller that may not be available yet. Methods block until +// resolve() is called. +// +// NOTE WELL: This is meant to stand-in for a very short time, to avoid racy +// behavior between releasing locks and calling resolve(), so even the +// context on the recv/send methods is ignored until underlying caller is +// reserved. +type promisedPipelineCaller struct { + ready chan struct{} + underlying capnp.PipelineCaller +} + +func newPromisedPipelineCaller() *promisedPipelineCaller { + return &promisedPipelineCaller{ + ready: make(chan struct{}), + underlying: nil, + } +} + +// resolve() resolves p to result. +func (p *promisedPipelineCaller) resolve(result capnp.PipelineCaller) { + p.underlying = result + close(p.ready) +} + +func (p *promisedPipelineCaller) PipelineSend( + ctx context.Context, + transform []capnp.PipelineOp, + s capnp.Send, +) (*capnp.Answer, capnp.ReleaseFunc) { + <-p.ready + return p.underlying.PipelineSend(ctx, transform, s) +} + +func (p *promisedPipelineCaller) PipelineRecv( + ctx context.Context, + transform []capnp.PipelineOp, + r capnp.Recv, +) capnp.PipelineCaller { + <-p.ready + return p.underlying.PipelineRecv(ctx, transform, r) +} + type parsedCall struct { target parsedMessageTarget method capnp.Method