Skip to content

Commit

Permalink
Fix #375 (I think)
Browse files Browse the repository at this point in the history
This addresses #375; the now-deleted comment about the time gap being OK
has been incorrect at least since we started handling `receiverAnswer`s
specially; the test that was failing was one where a reference to an
answer's promise filed (and transitively pcall) is handed to a method
implementation.

This patch avoids having a gap where pcall & promise are nil by:

- Changing setPipelineCaller so it can be called before we release the
  lock
- Introducing a stand-in type for the pipeline caller that we can set
  pcall to while the RecvCall is ongoing.

Before fixing the bug it took many thousands or tens of thousands of
test runs to trigger it, so while I have not been able to reproduce the
issue after tens of thousands of runs, that alone is inconclusive -- but
I *think* this fixes the bug. It definitely fixes something.
  • Loading branch information
zenhack committed Jan 19, 2023
1 parent 54d41bf commit 5ce76b8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
17 changes: 8 additions & 9 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/internal/rc"
"capnproto.org/go/capnp/v3/internal/syncutil"
rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc"
)

Expand Down Expand Up @@ -129,16 +128,16 @@ func (c *Conn) newReturn() (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ er
}

// setPipelineCaller sets ans.pcall to pcall if the answer has not
// already returned. The caller MUST NOT hold ans.c.lk.
// already returned. The caller MUST hold ans.c.lk.
//
// This also sets ans.promise to a new promise, wrapping pcall.
func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) {
syncutil.With(&ans.c.lk, func() {
if !ans.flags.Contains(resultsReady) {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
})
func (ans *answer) setPipelineCaller(c *lockedConn, m capnp.Method, pcall capnp.PipelineCaller) {
c.assertIs(ans.c)

if !ans.flags.Contains(resultsReady) {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
}

// AllocResults allocates the results struct.
Expand Down
62 changes: 53 additions & 9 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
c.tasks.Add(1) // will be finished by answer.Return
var callCtx context.Context
callCtx, ans.cancel = context.WithCancel(c.bgctx)
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := ent.client.RecvCall(callCtx, recv)
// Place PipelineCaller into answer. Since the receive goroutine is
// the only one that uses answer.pcall, it's fine that there's a
// time gap for this being set.
ans.setPipelineCaller(p.method, pcall)
pcall.resolve(ent.client.RecvCall(callCtx, recv))
})
return nil
case rpccp.MessageTarget_Which_promisedAnswer:
Expand Down Expand Up @@ -908,9 +906,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
c.tasks.Add(1) // will be finished by answer.Return
var callCtx context.Context
callCtx, ans.cancel = context.WithCancel(c.bgctx)
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := tgt.RecvCall(callCtx, recv)
ans.setPipelineCaller(p.method, pcall)
pcall.resolve(tgt.RecvCall(callCtx, recv))
})
} else {
// Results not ready, use pipeline caller.
Expand All @@ -919,10 +918,11 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
callCtx, ans.cancel = context.WithCancel(c.bgctx)
tgt := tgtAns.pcall
c.tasks.Add(1) // will be finished by answer.Return
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv)
pcall.resolve(tgt.PipelineRecv(callCtx, p.target.transform, recv))
tgtAns.pcalls.Done()
ans.setPipelineCaller(p.method, pcall)
})
}
return nil
Expand All @@ -932,6 +932,50 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
})
}

// A promisedPipelineCaller is a PipelineCaller that stands in for another
// PipelineCaller that may not be available yet. Methods block until
// resolve() is called.
//
// NOTE WELL: This is meant to stand-in for a very short time, to avoid racy
// behavior between releasing locks and calling resolve(), so even the
// context on the recv/send methods is ignored until underlying caller is
// reserved.
type promisedPipelineCaller struct {
ready chan struct{}
underlying capnp.PipelineCaller
}

func newPromisedPipelineCaller() *promisedPipelineCaller {
return &promisedPipelineCaller{
ready: make(chan struct{}),
underlying: nil,
}
}

// resolve() resolves p to result.
func (p *promisedPipelineCaller) resolve(result capnp.PipelineCaller) {
p.underlying = result
close(p.ready)
}

func (p *promisedPipelineCaller) PipelineSend(
ctx context.Context,
transform []capnp.PipelineOp,
s capnp.Send,
) (*capnp.Answer, capnp.ReleaseFunc) {
<-p.ready
return p.underlying.PipelineSend(ctx, transform, s)
}

func (p *promisedPipelineCaller) PipelineRecv(
ctx context.Context,
transform []capnp.PipelineOp,
r capnp.Recv,
) capnp.PipelineCaller {
<-p.ready
return p.underlying.PipelineRecv(ctx, transform, r)
}

type parsedCall struct {
target parsedMessageTarget
method capnp.Method
Expand Down

0 comments on commit 5ce76b8

Please sign in to comment.