From a461f24a187f17149a9629b1f365da53315e76de Mon Sep 17 00:00:00 2001 From: "Mark S. Miller" Date: Sun, 5 May 2024 22:00:23 -0700 Subject: [PATCH] feat(asyncFlow): E support --- packages/async-flow/src/replay-membrane.js | 127 ++++++++++++++++-- packages/async-flow/src/type-guards.js | 14 +- packages/async-flow/src/types.js | 6 + .../test/replay-membrane-eventual.test.js | 108 ++++++++++++++- 4 files changed, 233 insertions(+), 22 deletions(-) diff --git a/packages/async-flow/src/replay-membrane.js b/packages/async-flow/src/replay-membrane.js index e675c02bbbe..eb63c0ac7f4 100644 --- a/packages/async-flow/src/replay-membrane.js +++ b/packages/async-flow/src/replay-membrane.js @@ -8,6 +8,8 @@ import { makeConvertKit } from './convert.js'; /** * @import {PromiseKit} from '@endo/promise-kit' + * @import {PassableCap} from '@endo/pass-style' + * @import {VowKit} from '@agoric/vow' */ const { fromEntries, defineProperties, assign } = Object; @@ -26,7 +28,7 @@ export const makeReplayMembrane = ( watchWake, panic, ) => { - const { when } = vowTools; + const { when, makeVowKit } = vowTools; const equate = makeEquate(bijection); @@ -208,12 +210,111 @@ export const makeReplayMembrane = ( // //////////////// Eventual Send //////////////////////////////////////////// + /** + * @param {PassableCap} hostTarget + * @param {string | undefined} optVerb + * @param {Passable[]} hostArgs + * @param {number} callIndex + * @param {VowKit} hostResultKit + * @param {Promise} guestReturnedP + * @returns {Outcome} + */ + const performSend = ( + hostTarget, + optVerb, + hostArgs, + callIndex, + hostResultKit, + guestReturnedP, + ) => { + const { vow, resolver } = hostResultKit; + try { + const hostPromise = optVerb + ? E(hostTarget)[optVerb](...hostArgs) + : E(hostTarget)(...hostArgs); + resolver.resolve(hostPromise); // TODO does this always work? + } catch (hostProblem) { + throw Fail`internal: eventual send synchrously failed ${hostProblem}`; + } + try { + const entry = harden(['doReturn', callIndex, vow]); + log.pushEntry(entry); + const guestPromise = makeGuestForHostVow(vow, guestReturnedP); + // Note that `guestPromise` is not registered in the bijection since + // guestReturnedP is already the guest for vow. Rather, the handler + // returns guestPromise to resolve guestReturnedP to guestPromise. + const { kind } = doReturn(callIndex, vow); + kind === 'return' || Fail`internal: "return" kind expected ${q(kind)}`; + return harden({ + kind: 'return', + result: guestPromise, + }); + } catch (problem) { + throw panic(problem); + } + }; + const guestHandler = harden({ applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) { - if (optVerb === undefined) { - throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`; - } else { - throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`; + const callIndex = log.getIndex(); + if (stopped || !bijection.hasGuest(guestTarget)) { + Fail`Sent from a previous run: ${guestTarget}`; + } + // TODO FIX BUG this is not quite right. When guestResultP is returned + // as the resolution of guestResultP, it create a visious cycle error. + const hostResultKit = makeVowKit(); + bijection.init(guestReturnedP, hostResultKit.vow); + /** @type {Outcome} */ + let outcome; + try { + const guestEntry = harden([ + 'checkSend', + guestTarget, + optVerb, + guestArgs, + callIndex, + ]); + if (log.isReplaying()) { + const entry = log.nextEntry(); + equate( + guestEntry, + entry, + `replay ${callIndex}: + ${q(guestEntry)} + vs ${q(entry)} + `, + ); + outcome = /** @type {Outcome} */ (nestInterpreter(callIndex)); + } else { + const entry = guestToHost(guestEntry); + log.pushEntry(entry); + const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry; + nestInterpreter(callIndex); + outcome = performSend( + hostTarget, + optVerb, + hostArgs, + callIndex, + hostResultKit, + guestReturnedP, + ); + } + } catch (fatalError) { + throw panic(fatalError); + } + + switch (outcome.kind) { + case 'return': { + return outcome.result; + } + case 'throw': { + throw outcome.problem; + } + default: { + // @ts-expect-error TS correctly knows this case would be outside + // the type. But that's what we want to check. + throw Panic`unexpected outcome kind ${q(outcome.kind)}`; + } } }, applyFunction(guestTarget, guestArgs, guestReturnedP) { @@ -315,11 +416,19 @@ export const makeReplayMembrane = ( /** * @param {Vow} hVow + * @param {Promise} [promiseKey] + * If provided, use this promise as the key in the guestPromiseMap + * rather than the returned promise. This only happens when the + * promiseKey ends up forwarded to the returned promise anyway, so + * associating it with this resolve/reject pair is not incorrect. + * It is needed when `promiseKey` is also entered into the bijection + * paired with hVow. * @returns {Promise} */ - const makeGuestForHostVow = hVow => { + const makeGuestForHostVow = (hVow, promiseKey = undefined) => { const { promise, resolve, reject } = makeGuestPromiseKit(); - guestPromiseMap.set(promise, harden({ resolve, reject })); + promiseKey ??= promise; + guestPromiseMap.set(promiseKey, harden({ resolve, reject })); watchWake(hVow); @@ -343,7 +452,7 @@ export const makeReplayMembrane = ( hVow, async hostFulfillment => { await log.promiseReplayDone(); // should never reject - if (!stopped && guestPromiseMap.get(promise) !== 'settled') { + if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') { /** @type {LogEntry} */ const entry = harden(['doFulfill', hVow, hostFulfillment]); log.pushEntry(entry); @@ -358,7 +467,7 @@ export const makeReplayMembrane = ( }, async hostReason => { await log.promiseReplayDone(); // should never reject - if (!stopped && guestPromiseMap.get(promise) !== 'settled') { + if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') { /** @type {LogEntry} */ const entry = harden(['doReject', hVow, hostReason]); log.pushEntry(entry); diff --git a/packages/async-flow/src/type-guards.js b/packages/async-flow/src/type-guards.js index 65551bd5f89..c3ebc526739 100644 --- a/packages/async-flow/src/type-guards.js +++ b/packages/async-flow/src/type-guards.js @@ -42,13 +42,13 @@ export const LogEntryShape = M.or( M.arrayOf(M.any()), M.number(), ], - // [ - // 'checkSend', - // M.or(M.remotable('host target'), VowShape), - // M.opt(PropertyKeyShape), - // M.arrayOf(M.any()), - // M.number(), - // ], + [ + 'checkSend', + M.or(M.remotable('host target'), VowShape), + M.opt(PropertyKeyShape), + M.arrayOf(M.any()), + M.number(), + ], // ['checkReturn', M.number(), M.any()], // ['checkThrow', M.number(), M.any()], ); diff --git a/packages/async-flow/src/types.js b/packages/async-flow/src/types.js index f018c081a51..96c5031a5e3 100644 --- a/packages/async-flow/src/types.js +++ b/packages/async-flow/src/types.js @@ -95,6 +95,12 @@ * optVerb: PropertyKey|undefined, * args: Host[], * callIndex: number + * ] | [ + * op: 'checkSend', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number * ]} LogEntry */ diff --git a/packages/async-flow/test/replay-membrane-eventual.test.js b/packages/async-flow/test/replay-membrane-eventual.test.js index 51d7ff72ae6..af42bb6e8e5 100644 --- a/packages/async-flow/test/replay-membrane-eventual.test.js +++ b/packages/async-flow/test/replay-membrane-eventual.test.js @@ -7,6 +7,7 @@ import { } from './prepare-test-env-ava.js'; import { Fail } from '@endo/errors'; +import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js'; import { prepareVowTools } from '@agoric/vow'; import { E } from '@endo/eventual-send'; // import E from '@agoric/vow/src/E.js'; @@ -39,15 +40,19 @@ const preparePingee = zone => */ const testFirstPlay = async (t, zone) => { const vowTools = prepareVowTools(zone); + const { makeVowKit } = vowTools; const makeLogStore = prepareLogStore(zone); const makeBijection = prepareBijection(zone); const makePingee = preparePingee(zone); + const { vow: v1, resolver: r1 } = zone.makeOnce('v1', () => makeVowKit()); + const { vow: _v2, resolver: _r2 } = zone.makeOnce('v2', () => makeVowKit()); const log = zone.makeOnce('log', () => makeLogStore()); const bij = zone.makeOnce('bij', makeBijection); const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + const p1 = mem.hostToGuest(v1); t.deepEqual(log.dump(), []); /** @type {Pingee} */ @@ -56,18 +61,105 @@ const testFirstPlay = async (t, zone) => { const guestPingee = mem.hostToGuest(pingee); t.deepEqual(log.dump(), []); - const pingTestSendResult = t.throwsAsync(() => E(guestPingee).ping('send'), { - message: - 'panic over "[Error: guest eventual send not yet supported: \\"[Alleged: Pingee guest wrapper]\\".ping([\\"send\\"]) -> \\"[Promise]\\"]"', - }); + const p = E(guestPingee).ping('send'); + + guestPingee.ping('call'); + + t.is(await p, undefined); + const dump = log.dump(); + const v3 = dump[3][2]; + t.deepEqual(dump, [ + ['checkCall', pingee, 'ping', ['call'], 0], + ['doReturn', 0, undefined], + ['checkSend', pingee, 'ping', ['send'], 2], + ['doReturn', 2, v3], + ['doFulfill', v3, undefined], + ]); + + r1.resolve('x'); + t.is(await p1, 'x'); + + t.deepEqual(log.dump(), [ + ['checkCall', pingee, 'ping', ['call'], 0], + ['doReturn', 0, undefined], + ['checkSend', pingee, 'ping', ['send'], 2], + ['doReturn', 2, v3], + ['doFulfill', v3, undefined], + ['doFulfill', v1, 'x'], + ]); +}; + +/** + * @param {any} t + * @param {Zone} zone + */ +const testReplay = async (t, zone) => { + const vowTools = prepareVowTools(zone); + prepareLogStore(zone); + prepareBijection(zone); + preparePingee(zone); + const { vow: v1 } = zone.makeOnce('v1', () => Fail`need v1`); + const { vow: v2, resolver: r2 } = zone.makeOnce('v2', () => Fail`need v2`); + + const log = /** @type {LogStore} */ ( + zone.makeOnce('log', () => Fail`need log`) + ); + const bij = /** @type {Bijection} */ ( + zone.makeOnce('bij', () => Fail`need bij`) + ); + + const pingee = zone.makeOnce('pingee', () => Fail`need pingee`); + + const dump = log.dump(); + const v3 = dump[3][2]; + t.deepEqual(dump, [ + ['checkCall', pingee, 'ping', ['call'], 0], + ['doReturn', 0, undefined], + ['checkSend', pingee, 'ping', ['send'], 2], + ['doReturn', 2, v3], + ['doFulfill', v3, undefined], + ['doFulfill', v1, 'x'], + ]); + + const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + t.true(log.isReplaying()); + t.is(log.getIndex(), 0); + + const guestPingee = mem.hostToGuest(pingee); + const p2 = mem.hostToGuest(v2); + // @ts-expect-error TS doesn't know that r2 is a resolver + r2.resolve('y'); + await eventLoopIteration(); + + const p1 = mem.hostToGuest(v1); + mem.wake(); + t.true(log.isReplaying()); + t.is(log.getIndex(), 0); + t.deepEqual(log.dump(), [ + ['checkCall', pingee, 'ping', ['call'], 0], + ['doReturn', 0, undefined], + ['checkSend', pingee, 'ping', ['send'], 2], + ['doReturn', 2, v3], + ['doFulfill', v3, undefined], + ['doFulfill', v1, 'x'], + ]); + + E(guestPingee).ping('send'); guestPingee.ping('call'); - await pingTestSendResult; + t.is(await p1, 'x'); + t.is(await p2, 'y'); + t.false(log.isReplaying()); t.deepEqual(log.dump(), [ ['checkCall', pingee, 'ping', ['call'], 0], ['doReturn', 0, undefined], + ['checkSend', pingee, 'ping', ['send'], 2], + ['doReturn', 2, v3], + ['doFulfill', v3, undefined], + ['doFulfill', v1, 'x'], + ['doFulfill', v2, 'y'], ]); }; @@ -87,5 +179,9 @@ test.serial('test durable replay-membrane settlement', async t => { nextLife(); const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); - return testFirstPlay(t, zone1); + await testFirstPlay(t, zone1); + + nextLife(); + const zone3 = makeDurableZone(getBaggage(), 'durableRoot'); + return testReplay(t, zone3); });