diff --git a/packages/async-flow/src/async-flow.js b/packages/async-flow/src/async-flow.js index 433f2bdb26db..56b38713f4d6 100644 --- a/packages/async-flow/src/async-flow.js +++ b/packages/async-flow/src/async-flow.js @@ -199,7 +199,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { // host-to-guest call by "manually" sending the arguments through // and calling the guest function ourselves. Likewise, we // special case the handling of the guestResultP, rather than - // as the membrane to make a host vow for a guest promise. + // ask the membrane to make a host vow for a guest promise. // To support this special casing, we store additional replay // data in this internal flow instance -- the host activationArgs // and the host outcome vow kit. diff --git a/packages/async-flow/src/bijection.js b/packages/async-flow/src/bijection.js index 8f06b3d55f96..5cf011da69f1 100644 --- a/packages/async-flow/src/bijection.js +++ b/packages/async-flow/src/bijection.js @@ -100,14 +100,14 @@ export const prepareBijection = zone => { const hostToGuest = h2g.for(self); if (guestToHost.has(g)) { - guestToHost.get(g) === h || + toPassableCap(guestToHost.get(g)) === toPassableCap(h) || Fail`internal: g->h ${g} -> ${h} vs ${guestToHost.get(g)}`; hostToGuest.get(h) === g || Fail`internal h->g: ${h} -> ${g} vs ${hostToGuest.get(h)}`; return true; } else { !hostToGuest.has(h) || - Fail`internal: unexpected h->g ${h} -> ${guestToHost.get(h)}`; + Fail`internal: unexpected h->g ${h} -> ${hostToGuest.get(h)}`; return false; } }, diff --git a/packages/async-flow/src/replay-membrane.js b/packages/async-flow/src/replay-membrane.js index 556a1a2bdb8a..99f09fd78bd5 100644 --- a/packages/async-flow/src/replay-membrane.js +++ b/packages/async-flow/src/replay-membrane.js @@ -264,6 +264,22 @@ export const makeReplayMembrane = ( watchWake(hVow); + // The replay membrane is the only component inserting entries into + // the log. In particular, the flow's vow durable watcher does not log the + // settlement outcome, and instead it's the responsibility of the + // membrane's ephemeral handler. Because of this, the membrane's handler + // must be careful to: + // - Be added to the vow if the settlement has not yet been recorded in + // the log. + // - Insert a single settlement outcome in the log for the given vow. + // + // In practice the former is accomplished by a handler always being + // added to the host vow when creating a guest promise, and the + // handler checking after replay is complete, whether the guest promise + // is already settled (by the log replay) or not. The latter is + // accomplished by checking that the membrane has not been stopped + // before updating the log. + void when( hVow, async hostFulfillment => { @@ -272,7 +288,13 @@ export const makeReplayMembrane = ( /** @type {LogEntry} */ const entry = harden(['doFulfill', hVow, hostFulfillment]); log.pushEntry(entry); - interpretOne(topDispatch, entry); // does its own panic + try { + interpretOne(topDispatch, entry); + } catch { + // interpretOne does its own try/catch/panic, so failure would + // already be registered. Here, just return to avoid the + // Unhandled rejection. + } } }, async hostReason => { @@ -281,7 +303,13 @@ export const makeReplayMembrane = ( /** @type {LogEntry} */ const entry = harden(['doReject', hVow, hostReason]); log.pushEntry(entry); - interpretOne(topDispatch, entry); // does its own panic + try { + interpretOne(topDispatch, entry); + } catch { + // interpretOne does its own try/catch/panic, so failure would + // already be registered. Here, just return to avoid the + // Unhandled rejection. + } } }, ); diff --git a/packages/async-flow/test/equate.test.js b/packages/async-flow/test/equate.test.js index c90040a42a6e..10876503a2bb 100644 --- a/packages/async-flow/test/equate.test.js +++ b/packages/async-flow/test/equate.test.js @@ -56,7 +56,7 @@ const testEquate = (t, zone, showOnConsole = false) => { message: 'internal: g->h "[Alleged: g1]" -> "[Vow]" vs "[Alleged: h1]"', }); t.throws(() => equate(g2, h1), { - message: 'key not found: "[Alleged: h1]"', + message: 'internal: unexpected h->g "[Alleged: h1]" -> "[Alleged: g1]"', }); bij.init(g2, h2); equate(g2, h2); diff --git a/packages/async-flow/test/replay-membrane-zombie.test.js b/packages/async-flow/test/replay-membrane-zombie.test.js new file mode 100644 index 000000000000..2cad21ab2006 --- /dev/null +++ b/packages/async-flow/test/replay-membrane-zombie.test.js @@ -0,0 +1,158 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, +} from './prepare-test-env-ava.js'; + +import { Fail } from '@endo/errors'; +import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js'; +import { prepareVowTools } from '@agoric/vow'; +import { makeHeapZone } from '@agoric/zone/heap.js'; +import { makeVirtualZone } from '@agoric/zone/virtual.js'; +import { makeDurableZone } from '@agoric/zone/durable.js'; + +import { prepareLogStore } from '../src/log-store.js'; +import { prepareBijection } from '../src/bijection.js'; +import { makeReplayMembrane } from '../src/replay-membrane.js'; + +const watchWake = _vowish => {}; +const panic = problem => Fail`panic over ${problem}`; + +/** + * @param {any} t + * @param {Zone} zone + */ +const testMissingStop = async (t, zone) => { + const vowTools = prepareVowTools(zone); + const { makeVowKit } = vowTools; + const makeLogStore = prepareLogStore(zone); + const makeBijection = prepareBijection(zone); + + const log = makeLogStore(); + const bij = makeBijection(); + + const memA = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const { vow: v1, resolver: r1 } = makeVowKit(); + + const p1A = memA.hostToGuest(v1); + t.true(bij.has(p1A, v1)); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), []); + + // do all the steps to drop an old membrane and set up a new membrane, + // except stopping the old membrane, + // to demonstate why `makeGuestForHostVow` also tests`stopped`. + log.reset(); + bij.reset(); + const memB = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const p1B = memB.hostToGuest(v1); + t.true(bij.has(p1B, v1)); + t.false(bij.hasGuest(p1A)); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), []); + + r1.resolve('x'); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), [ + // keep line break + ['doFulfill', v1, 'x'], + ['doFulfill', v1, 'x'], // this duplication is wrong, is the point + ]); +}; + +/** + * @param {any} t + * @param {Zone} zone + */ +const testProperStop = async (t, zone) => { + const vowTools = prepareVowTools(zone); + const { makeVowKit } = vowTools; + const makeLogStore = prepareLogStore(zone); + const makeBijection = prepareBijection(zone); + + const log = makeLogStore(); + const bij = makeBijection(); + + const memA = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const { vow: v1, resolver: r1 } = makeVowKit(); + + const p1A = memA.hostToGuest(v1); + t.true(bij.has(p1A, v1)); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), []); + + // do all the steps to drop an old membrane and set up a new membrane, + // including stopping the old membrane, + // to demonstate why `makeGuestForHostVow` also tests`stopped`. + log.reset(); + bij.reset(); + memA.stop(); // the point + const memB = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const p1B = memB.hostToGuest(v1); + t.true(bij.has(p1B, v1)); + t.false(bij.hasGuest(p1A)); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), []); + + r1.resolve('x'); + + await eventLoopIteration(); + + t.deepEqual(log.dump(), [ + // keep line break + ['doFulfill', v1, 'x'], + ]); +}; + +await test.serial('test heap replay-membrane missing stop', async t => { + const zone = makeHeapZone('heapRoot'); + return testMissingStop(t, zone); +}); + +await test.serial('test heap replay-membrane proper stop', async t => { + annihilate(); + const zone = makeHeapZone('heapRoot'); + return testProperStop(t, zone); +}); + +await test.serial('test virtual replay-membrane missing stop', async t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + return testMissingStop(t, zone); +}); + +await test.serial('test virtual replay-membrane proper stop', async t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + return testProperStop(t, zone); +}); + +await test.serial('test durable replay-membrane missing stop', async t => { + annihilate(); + nextLife(); + const zone = makeDurableZone(getBaggage(), 'durableRoot'); + return testMissingStop(t, zone); +}); + +await test.serial('test durable replay-membrane proper stop', async t => { + annihilate(); + nextLife(); + const zone = makeDurableZone(getBaggage(), 'durableRoot'); + return testProperStop(t, zone); +});