diff --git a/rpc/answer.go b/rpc/answer.go index ce9c61a7..6d798e2b 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -8,7 +8,6 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/rc" - "capnproto.org/go/capnp/v3/internal/syncutil" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" ) @@ -129,16 +128,16 @@ func (c *Conn) newReturn() (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ er } // setPipelineCaller sets ans.pcall to pcall if the answer has not -// already returned. The caller MUST NOT hold ans.c.lk. +// already returned. The caller MUST hold ans.c.lk. // // This also sets ans.promise to a new promise, wrapping pcall. -func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) { - syncutil.With(&ans.c.lk, func() { - if !ans.flags.Contains(resultsReady) { - ans.pcall = pcall - ans.promise = capnp.NewPromise(m, pcall) - } - }) +func (ans *answer) setPipelineCaller(c *lockedConn, m capnp.Method, pcall capnp.PipelineCaller) { + c.assertIs(ans.c) + + if !ans.flags.Contains(resultsReady) { + ans.pcall = pcall + ans.promise = capnp.NewPromise(m, pcall) + } } // AllocResults allocates the results struct. diff --git a/rpc/rpc.go b/rpc/rpc.go index 99cabdeb..6751f443 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -846,12 +846,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := ent.client.RecvCall(callCtx, recv) - // Place PipelineCaller into answer. Since the receive goroutine is - // the only one that uses answer.pcall, it's fine that there's a - // time gap for this being set. - ans.setPipelineCaller(p.method, pcall) + pcall.resolve(ent.client.RecvCall(callCtx, recv)) }) return nil case rpccp.MessageTarget_Which_promisedAnswer: @@ -908,9 +906,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := tgt.RecvCall(callCtx, recv) - ans.setPipelineCaller(p.method, pcall) + pcall.resolve(tgt.RecvCall(callCtx, recv)) }) } else { // Results not ready, use pipeline caller. @@ -919,10 +918,11 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn callCtx, ans.cancel = context.WithCancel(c.bgctx) tgt := tgtAns.pcall c.tasks.Add(1) // will be finished by answer.Return + pcall := newPromisedPipelineCaller() + ans.setPipelineCaller(c, p.method, pcall) rl.Add(func() { - pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv) + pcall.resolve(tgt.PipelineRecv(callCtx, p.target.transform, recv)) tgtAns.pcalls.Done() - ans.setPipelineCaller(p.method, pcall) }) } return nil @@ -932,6 +932,50 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn }) } +// A promisedPipelineCaller is a PipelineCaller that stands in for another +// PipelineCaller that may not be available yet. Methods block until +// resolve() is called. +// +// NOTE WELL: This is meant to stand-in for a very short time, to avoid racy +// behavior between releasing locks and calling resolve(), so even the +// context on the recv/send methods is ignored until underlying caller is +// reserved. +type promisedPipelineCaller struct { + ready chan struct{} + underlying capnp.PipelineCaller +} + +func newPromisedPipelineCaller() *promisedPipelineCaller { + return &promisedPipelineCaller{ + ready: make(chan struct{}), + underlying: nil, + } +} + +// resolve() resolves p to result. +func (p *promisedPipelineCaller) resolve(result capnp.PipelineCaller) { + p.underlying = result + close(p.ready) +} + +func (p *promisedPipelineCaller) PipelineSend( + ctx context.Context, + transform []capnp.PipelineOp, + s capnp.Send, +) (*capnp.Answer, capnp.ReleaseFunc) { + <-p.ready + return p.underlying.PipelineSend(ctx, transform, s) +} + +func (p *promisedPipelineCaller) PipelineRecv( + ctx context.Context, + transform []capnp.PipelineOp, + r capnp.Recv, +) capnp.PipelineCaller { + <-p.ready + return p.underlying.PipelineRecv(ctx, transform, r) +} + type parsedCall struct { target parsedMessageTarget method capnp.Method