From 9774ce3699e42a8c67e7c97466732edc5efe0961 Mon Sep 17 00:00:00 2001 From: "Mark S. Miller" Date: Thu, 21 Mar 2024 17:35:28 -0700 Subject: [PATCH] fixup! close --- packages/zone/src/async-flow/async-flow.js | 244 ++++++++++-------- packages/zone/src/async-flow/equate.js | 4 +- packages/zone/src/async-flow/log-store.js | 10 +- .../zone/src/async-flow/replay-membrane.js | 57 +++- .../zone/src/async-flow/weak-bijection.js | 12 +- .../zone/test/async-flow/test-async-flow.js | 222 +++++++++++++--- 6 files changed, 391 insertions(+), 158 deletions(-) diff --git a/packages/zone/src/async-flow/async-flow.js b/packages/zone/src/async-flow/async-flow.js index b52b4e50980..7bab5df59c6 100644 --- a/packages/zone/src/async-flow/async-flow.js +++ b/packages/zone/src/async-flow/async-flow.js @@ -1,4 +1,4 @@ -import { Fail } from '@endo/errors'; +import { Fail, q } from '@endo/errors'; import { E } from '@endo/far'; import { M } from '@endo/patterns'; import { VowShape } from '@agoric/vow'; @@ -6,11 +6,11 @@ import { PromiseWatcherI } from '@agoric/vow/src/watch-promise.js'; import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; import { makeReplayMembrane } from './replay-membrane.js'; import { prepareLogStore } from './log-store.js'; -import { prepareWeakBijection } from './weak-bijection.js'; +import { vowishKey, prepareWeakBijection } from './weak-bijection.js'; import { makeEphemera } from './ephemera.js'; import { LogEntryShape } from './type-guards.js'; -// const { defineProperties } = Object; +const { defineProperties } = Object; const { apply } = Reflect; const AsyncFlowIKit = harden({ @@ -32,6 +32,7 @@ const AsyncFlowIKit = harden({ const AdminAsyncFlowI = M.interface('AsyncFlowAdmin', { getFailures: M.call().returns(M.mapOf(M.remotable('asyncFlow'), M.error())), wakeAll: M.call().returns(), + getFlowForOutcomeVow: M.call(VowShape).returns(M.opt(M.remotable('flow'))), }); /** @@ -61,27 +62,33 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { ), // initialized by restart })); - const adminAsyncFlow = outerZone.exo('AdminAsyncFlow', AdminAsyncFlowI, { - getFailures() { - return failures.snapshot(); - }, - wakeAll() { - for (const asyncFlow of eagerWakers.keys()) { - asyncFlow.wake(); - } - }, + /** + * So we can give out wrapper functions easily and recover flow objects + * for their activations later. + */ + const flowForOutcomeVowKey = outerZone.mapStore('flowForOutcomeVow', { + keyShape: M.remotable('vowishKey'), + valueShape: M.remotable('asyncFlow'), }); /** * @param {import('@agoric/base-zone').Zone} zone * @param {string} tag - * @param {GuestAsyncFunc} guestAsyncFunc + * @param {GuestAsyncFunc} [optGuestAsyncFunc] * @param {{ startEager?: boolean }} [options] */ - const prepareAsyncFlowKit = (zone, tag, guestAsyncFunc, options = {}) => { - const { startEager = true } = options; - - const makeAsyncFlowKit = zone.exoClassKit( + const prepareAsyncFlowKit = ( + zone, + tag, + optGuestAsyncFunc = undefined, + options = {}, + ) => { + const { + // May change default to false, once instances reliably wake up + startEager = true, + } = options; + + const internalMakeAsyncFlowKit = zone.exoClassKit( tag, AsyncFlowIKit, (activationThis, activationArgs) => { @@ -101,7 +108,12 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { }, { flow: { - restart(func = guestAsyncFunc, eager = startEager) { + restart(func = optGuestAsyncFunc, eager = startEager) { + if (func === undefined) { + throw Fail`Function must either be in prepareAsyncFlowKit or restart args: ${q( + tag, + )}`; + } const { state, facets } = this; const { activationThis, @@ -112,7 +124,9 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { } = state; const { flow, admin, wakeWatcher } = facets; - !state.isDone || Fail`Cannot restart a done flow ${flow}`; + !state.isDone || + // separate line so I can set a breakpoint + Fail`Cannot restart a done flow ${flow}`; admin.reset(); if (eager) { @@ -143,7 +157,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { // In case some host promises were settled before the guest makes // the first call to a host object. - void membrane.wake(); + membrane.wake(); // We do *not* call the guesAsyncFunc by having the membrane make // a host wrapper for the function. Rather, we special case this @@ -166,21 +180,16 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { // in further turns by `await`ing (or otherwise registering) // on host vows turned into guest promises, and by calling // the guest presence of other host objects. - const guestResultPStillGood = () => { - if (bijection.hasGuest(guestResultP)) { - bijection.has(guestResultP, outcomeKit.vow) || - Fail`unexpected vow ${guestResultP} -> ${bijection.guestToHost( - guestResultP, - )} vs ${outcomeKit.vow}`; - return true; - } else { - return false; - } - }; + // + // `bijection.hasGuest(guestResultP)` can be false in a delayed + // guest - to - host setlling from a previous run. + // In that case, the bijection was reset and all guest caps + // created in the previous run were unregistered, + // including `guestResultP`. void E.when( guestResultP, gFulfillment => { - if (guestResultPStillGood()) { + if (bijection.hasGuest(guestResultP)) { outcomeKit.resolver.resolve( membrane.guestToHost(gFulfillment), ); @@ -188,7 +197,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { } }, guestReason => { - if (guestResultPStillGood()) { + if (bijection.hasGuest(guestResultP)) { outcomeKit.resolver.reject(membrane.guestToHost(guestReason)); admin.done(); } @@ -205,14 +214,18 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { if (state.isDone) { return; } - void tmp.for(flow).membrane.wake(); + if (tmp.for(flow).membrane) { + tmp.for(flow).membrane.wake(); + } else { + flow.restart(); + } }, getOutcome() { const { state, facets } = this; const { outcomeKit } = state; const { flow } = facets; - void flow.wake(); + flow.wake(); return outcomeKit.vow; }, dump() { @@ -238,7 +251,11 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { failures.delete(flow); } if (eagerWakers.has(flow)) { - eagerWakers.delete(flow); + // For now, once an eagerWaker, always an eagerWaker + // eagerWakers.delete(flow); + } + if (tmp.for(flow).membrane) { + tmp.for(flow).membrane.stop(); } tmp.resetFor(flow); log.reset(); @@ -248,13 +265,16 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { }, done() { const { state, facets } = this; + const { log } = state; const { admin } = facets; admin.reset(); state.isDone = true; + log.dispose(); }, panic(fatalProblem) { - const { facets } = this; + const { state, facets } = this; + const { bijection, log } = state; const { flow } = facets; if (failures.has(flow)) { @@ -262,27 +282,93 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { } else { failures.init(flow, fatalProblem); } - throw fatalProblem; + + if (tmp.for(flow).membrane) { + tmp.for(flow).membrane.stop(); + } + tmp.resetFor(flow); + log.reset(); + bijection.reset(); + + // This is a non-sensical return value, so arbitrary chaos + // may ensue from returning it. But at this point + // we should have successfully isolated this activation from + // having any observable effects on the host, aside from + // console logging and + // resource exhaustion, including infinite loops + return undefined; }, }, wakeWatcher: { onFulfilled(_fulfillment) { const { facets } = this; - void facets.flow.wake(); + facets.flow.wake(); }, onRejected(_fulfillment) { const { facets } = this; - void facets.flow.wake(); + facets.flow.wake(); }, }, }, ); + const makeAsyncFlowKit = (activationThis, activationArgs) => { + const asyncFlowKit = internalMakeAsyncFlowKit( + activationThis, + activationArgs, + ); + asyncFlowKit.flow.restart(); + return asyncFlowKit; + }; return harden(makeAsyncFlowKit); }; + /** + * @param {import('@agoric/base-zone').Zone} zone + * @param {string} tag + * @param {GuestAsyncFunc} guestFunc + * @param {{ startEager?: boolean }} [options] + * @returns {HostAsyncFuncWrapper} + */ + const asyncFlow = (zone, tag, guestFunc, options = undefined) => { + const makeAsyncFlowKit = prepareAsyncFlowKit(zone, tag, guestFunc, options); + const hostFuncName = `${guestFunc.name || 'anon'}_hostWrapper`; + const wrapperFunc = { + [hostFuncName](...args) { + const { flow } = makeAsyncFlowKit(this, args); + const outcomeVow = flow.getOutcome(); + flowForOutcomeVowKey.init(vowishKey(outcomeVow), flow); + return outcomeVow; + }, + }[hostFuncName]; + defineProperties(wrapperFunc, { + length: { value: guestFunc.length }, + }); + return harden(wrapperFunc); + }; + + const adminAsyncFlow = outerZone.exo('AdminAsyncFlow', AdminAsyncFlowI, { + getFailures() { + return failures.snapshot(); + }, + wakeAll() { + // [...stuff.keys()] in order to snapshot before iterating + const flowsToWake = [...failures.keys(), ...eagerWakers.keys()]; + for (const flow of flowsToWake) { + flow.wake(); + } + }, + getFlowForOutcomeVow(outcomeVow) { + return flowForOutcomeVowKey.get(vowishKey(outcomeVow)); + }, + }); + + // Cannot call this until everything is prepared + // adminAsyncFlow.wakeAll(); + return harden({ - adminAsyncFlow, prepareAsyncFlowKit, + asyncFlow, + adminAsyncFlow, }); }; harden(prepareAsyncFlowTools); @@ -296,79 +382,13 @@ harden(prepareAsyncFlowTools); */ /** - * @typedef {ReturnType>} AsyncFlowKit + * @typedef {ReturnType} MakeAsyncFlowKit */ /** - * @typedef {AsyncFlowKit['flow']} AsyncFlow + * @typedef {ReturnType} AsyncFlowKit */ -// /** -// * @param {import('@agoric/base-zone').Zone} zone -// * @param {string} tag -// * @param {GuestAsyncFunc} guestAsyncFunc -// * @param {PreparationOptions} [options] -// */ -// export const prepareAsyncFlow = ( -// zone, -// tag, -// guestAsyncFunc, -// options = undefined, -// ) => { -// const makeAsyncFlowKit = prepareAsyncFlowKit( -// zone, -// tag, -// guestAsyncFunc, -// options, -// ); - -// const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`; -// const makeAsyncFlow = { -// [hostFuncName](...args) { -// const { flow } = makeAsyncFlowKit(this, args); -// return flow; -// }, -// }[hostFuncName]; - -// defineProperties(makeAsyncFlow, { -// length: { value: guestAsyncFunc.length }, -// }); - -// return harden(makeAsyncFlow); -// }; -// harden(prepareAsyncFlow); - -// /** -// * @param {import('@agoric/base-zone').Zone} zone -// * @param {string} tag -// * @param {GuestAsyncFunc} guestAsyncFunc -// * @param {PreparationOptions} [options] -// */ -// export const prepareAsyncFlowFunc = ( -// zone, -// tag, -// guestAsyncFunc, -// options = undefined, -// ) => { -// const makeAsyncFlowKit = prepareAsyncFlowKit( -// zone, -// tag, -// guestAsyncFunc, -// options, -// ); - -// const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`; -// const makeAsyncFlow = { -// [hostFuncName](...args) { -// const { flow } = makeAsyncFlowKit(this, args); -// return flow.getOutcome(); -// }, -// }[hostFuncName]; - -// defineProperties(makeAsyncFlow, { -// length: { value: guestAsyncFunc.length }, -// }); - -// return harden(makeAsyncFlow); -// }; -// harden(prepareAsyncFlow); +/** + * @typedef {AsyncFlowKit['flow']} AsyncFlow + */ diff --git a/packages/zone/src/async-flow/equate.js b/packages/zone/src/async-flow/equate.js index 581da152ed0..62134ff5c08 100644 --- a/packages/zone/src/async-flow/equate.js +++ b/packages/zone/src/async-flow/equate.js @@ -26,7 +26,9 @@ export const makeEquate = bijection => { const innerEquate = (g, h) => { if (!isObject(g)) { - is(g, h) || Fail`unequal primitives ${g} vs ${h}`; + is(g, h) || + // separate line so I can set a breakpoint + Fail`unequal primitives ${g} vs ${h}`; return; } if (bijection.has(g, h)) { diff --git a/packages/zone/src/async-flow/log-store.js b/packages/zone/src/async-flow/log-store.js index 1bd2fc65140..e204ea90f17 100644 --- a/packages/zone/src/async-flow/log-store.js +++ b/packages/zone/src/async-flow/log-store.js @@ -6,7 +6,7 @@ import { makeEphemera } from './ephemera.js'; const LogStoreI = M.interface('LogStore', { reset: M.call().returns(), - restart: M.call().returns(), + dispose: M.call().returns(), getIndex: M.call().returns(M.number()), getLength: M.call().returns(M.number()), isReplaying: M.call().returns(M.boolean()), @@ -61,13 +61,13 @@ export const prepareLogStore = zone => { }, { reset() { - const { state, self } = this; + const { self } = this; tmp.resetFor(self); - state.mapStore.clear(); }, - restart() { - const { self } = this; + dispose() { + const { state, self } = this; tmp.resetFor(self); + state.mapStore.clear(); }, getIndex() { const { self } = this; diff --git a/packages/zone/src/async-flow/replay-membrane.js b/packages/zone/src/async-flow/replay-membrane.js index 11b7439281a..44b30e909ec 100644 --- a/packages/zone/src/async-flow/replay-membrane.js +++ b/packages/zone/src/async-flow/replay-membrane.js @@ -1,5 +1,5 @@ /* eslint-disable no-use-before-define */ -import { Fail, q } from '@endo/errors'; +import { Fail, b, q } from '@endo/errors'; import { Remotable, getInterfaceOf } from '@endo/pass-style'; import { E } from '@endo/eventual-send'; import { getMethodNames } from '@endo/eventual-send/utils.js'; @@ -23,7 +23,7 @@ export const makeReplayMembrane = ( watchWake, panic, ) => { - const { watch: _watch, when, makeVowKit: _makeVowKit } = vowTools; + const { when } = vowTools; const equate = makeEquate(bijection); @@ -126,6 +126,30 @@ export const makeReplayMembrane = ( }; const guestCallsHost = (guestTarget, optVerb, guestArgs, callIndex) => { + if (!bijection.hasGuest(guestTarget)) { + // This happens in a delayed guest-to-host call from a previous run. + // In that case, the bijection was reset and all guest caps + // created in the previous run were unregistered, + // including guestTarget. + // Throwing an error back to the old guest caller may cause + // it to proceed in all sorts of crazy ways. But that old run + // should now be isolated and unable to cause any observable effects. + // Well, except for resource exhaustion including infinite loops, + // which would be a genuine problem. + // + // Console logging of unhandled rejections, errors thrown to the top + // of the event loop, or anything else are not problematic effects. + // At this level of abstraction, we don't consider console logging + // activity to be observable. Thus, it is also ok for the guest + // function, which should otherwise be closed, to + // capture (lexically "close over") the `console`. + const extraDiagnostic = + callStack.length === 0 + ? '' + : // This case should only happen when the callStack is empty + ` with non-empty callstack ${q(callStack)};`; + Fail`Called from a previous run: ${guestTarget}${b(extraDiagnostic)}`; + } /** @type {Outcome} */ let outcome; try { @@ -148,7 +172,7 @@ export const makeReplayMembrane = ( outcome = performCall(...args); } } catch (fatalError) { - throw panic(fatalError); + return panic(fatalError); } if (outcome.kind === 'return') { @@ -228,7 +252,7 @@ export const makeReplayMembrane = ( } }, hostReason => { - if (!log.isReplaying()) { + if (!log.isReplaying() && guestPromiseMap.get(promise) !== 'settled') { /** @type {LogEntry} */ const entry = harden(['doReject', hVow, hostReason]); log.pushEntry(entry); @@ -268,7 +292,9 @@ export const makeReplayMembrane = ( const interpretOne = (dispatch, [op, ...args]) => { try { - op in dispatch || Fail`unexpected dispatch op: ${q(op)}`; + op in dispatch || + // separate line so I can set a breakpoint + Fail`unexpected dispatch op: ${q(op)}`; return dispatch[op](...args); } catch (problem) { throw panic(problem); @@ -294,7 +320,9 @@ export const makeReplayMembrane = ( const entry = log.nextEntry(); const optOutcome = interpretOne(nestDispatch, entry); if (unnestFlag) { - optOutcome || Fail`only unnest with an outcome: ${q(entry[0])}`; + optOutcome || + // separate line so I can set a breakpoint + Fail`only unnest with an outcome: ${q(entry[0])}`; unnestFlag = false; return optOutcome; } @@ -306,17 +334,25 @@ export const makeReplayMembrane = ( * @param {number} callIndex */ const unnestInterpreter = callIndex => { - callStack.length >= 1 || Fail`Unmatched unnest: ${q(callIndex)}`; + callStack.length >= 1 || + // separate line so I can set a breakpoint + Fail`Unmatched unnest: ${q(callIndex)}`; const i = callStack.pop(); - i === callIndex || Fail`Unexpected unnest: ${q(callIndex)} vs ${q(i)}`; + i === callIndex || + // separate line so I can set a breakpoint + Fail`Unexpected unnest: ${q(callIndex)} vs ${q(i)}`; unnestFlag = true; if (callStack.length === 0) { void E.when(undefined, wake); } }; + let stopped = false; + const wake = () => { while (log.isReplaying()) { + !stopped || + Fail`This membrane stopped. Restart with new membrane ${replayMembrane}`; callStack.length === 0 || Fail`wake only with empty callStack: ${q(callStack)}`; const entry = log.peekEntry(); @@ -329,10 +365,15 @@ export const makeReplayMembrane = ( } }; + const stop = () => { + stopped = true; + }; + const replayMembrane = harden({ hostToGuest, guestToHost, wake, + stop, }); return replayMembrane; }; diff --git a/packages/zone/src/async-flow/weak-bijection.js b/packages/zone/src/async-flow/weak-bijection.js index 165dd17c142..72a02d180ac 100644 --- a/packages/zone/src/async-flow/weak-bijection.js +++ b/packages/zone/src/async-flow/weak-bijection.js @@ -42,13 +42,17 @@ const makeVowishStore = name => { return Far(name, { init: (k, v) => { const k2 = vowishKey(k); - !map.has(k2) || Fail`key already bound: ${k} -> ${map.get(k2)} vs ${v}`; + !map.has(k2) || + // separate line so I can set a breakpoint + Fail`key already bound: ${k} -> ${map.get(k2)} vs ${v}`; map.set(k2, v); }, has: k => map.has(vowishKey(k)), get: k => { const k2 = vowishKey(k); - map.has(k2) || Fail`key not found: ${k}`; + map.has(k2) || + // separate line so I can set a breakpoint + Fail`key not found: ${k}`; return map.get(k2); }, }); @@ -75,7 +79,9 @@ export const prepareWeakBijection = zone => { const { self } = this; g2h.for(self).init(g, h); h2g.for(self).init(h, g); - self.has(g, h) || Fail`internal: ${g} <-> ${h}`; + self.has(g, h) || + // separate line so I can set a breakpoint + Fail`internal: ${g} <-> ${h}`; }, define(g, h) { const { self } = this; diff --git a/packages/zone/test/async-flow/test-async-flow.js b/packages/zone/test/async-flow/test-async-flow.js index 6737a763809..3f84b79bdb3 100644 --- a/packages/zone/test/async-flow/test-async-flow.js +++ b/packages/zone/test/async-flow/test-async-flow.js @@ -10,12 +10,13 @@ import { import { Fail } from '@endo/errors'; // import { E } from '@endo/far'; // import E from '@agoric/vow/src/E.js'; +import { passStyleOf } from '@endo/pass-style'; import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js'; import { isVow } from '@agoric/vow/src/vow-utils.js'; import { makePromiseKit } from '@endo/promise-kit'; import { prepareVowTools } from '@agoric/vow'; import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; -import { prepareAsyncFlow } from '../../src/async-flow/async-flow.js'; +import { prepareAsyncFlowTools } from '../../src/async-flow/async-flow.js'; import { makeHeapZone } from '../../heap.js'; import { makeVirtualZone } from '../../virtual.js'; @@ -57,6 +58,9 @@ const prepareOrchestra = (zone, k = 1) => * @param {boolean} [showOnConsole] */ const testFirstPlay = async (t, zone, vowTools, showOnConsole = false) => { + const { asyncFlow, adminAsyncFlow } = prepareAsyncFlowTools(zone, { + vowTools, + }); const makeOrchestra = prepareOrchestra(zone); const { makeVowKit } = vowTools; @@ -98,20 +102,21 @@ const testFirstPlay = async (t, zone, vowTools, showOnConsole = false) => { }, }; - const makeAsyncFlow = prepareAsyncFlow(zone, 'AsyncFlow1', guestMethod, { - vowTools, - }); + const wrapperFunc = asyncFlow(zone, 'AsyncFlow1', guestMethod); - const asyncFlow = zone.makeOnce('asyncFlow', () => - apply(makeAsyncFlow, 'context', [hOrch7, v1]), + const outcomeV = zone.makeOnce('outcomeV', () => + apply(wrapperFunc, 'context', [hOrch7, v1]), ); - const outcomeV = zone.makeOnce('outcomeV', () => asyncFlow.getOutcome()); + t.true(isVow(outcomeV)); r1.resolve('x'); + const flow = adminAsyncFlow.getFlowForOutcomeVow(outcomeV); + t.is(passStyleOf(flow), 'remotable'); + if (showOnConsole) { console.log('done', await promiseTestDone); - console.log('log dump', asyncFlow.dump()); + console.log('log dump', flow.dump()); } return promiseTestDone; }; @@ -126,6 +131,86 @@ const testBadReplay = async (t, zone, vowTools, showOnConsole = false) => { if (showOnConsole) { console.log('badReplay started'); } + const { asyncFlow, adminAsyncFlow } = prepareAsyncFlowTools(zone, { + vowTools, + }); + prepareOrchestra(zone); + const { when } = vowTools; + const hOrch7 = /** @type {Orchestra} */ ( + zone.makeOnce('hOrch7', () => Fail`hOrch7 expected`) + ); + // purposely violate rule that guestMethod is closed. + const { promise: promiseTestDone, resolve: endTest } = makePromiseKit(); + + const { guestMethod } = { + async guestMethod(gOrch7, gP) { + t.is(this, 'context'); + if (showOnConsole) { + console.log('about to await gP'); + } + await gP; + const prod = gOrch7.scale(4); + t.is(prod, 21); + + let gErr; + try { + gOrch7.scale(9n); + } catch (e) { + gErr = e; + } + t.is(gErr.name, 'TypeError'); + + const g2 = gOrch7.vow(); + endTest(true); + if (showOnConsole) { + console.log('about to await g2'); + } + await g2; // awaiting a promise that won't be resolved until this turn + if (showOnConsole) { + console.log('I woke up!'); + } + endTest('done'); + }, + }; + + // `asyncFlow` can be used simply to re-prepare the guest function + // by ignoring the returned wrapper function. If the wrapper function is + // invoked, that would be a *new* activation with a new outcome and + // flow, and would have nothing to do with the existing one. + asyncFlow(zone, 'AsyncFlow1', guestMethod); + + const outcomeV = /** @type {Vow} */ ( + zone.makeOnce('outcomeV', () => Fail`outcomeV expected`) + ); + + hOrch7.resolve('y'); + // TODO I shouldn't need to do this. + await adminAsyncFlow.wakeAll(); + t.is(await when(hOrch7.vow()), 'y'); + + const flow = adminAsyncFlow.getFlowForOutcomeVow(outcomeV); + t.is(passStyleOf(flow), 'remotable'); + + if (showOnConsole) { + console.log('log dump', flow.dump()); + console.log('goodReplay done', await promiseTestDone); + } + return promiseTestDone; +}; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testGoodReplay = async (t, zone, vowTools, showOnConsole = false) => { + if (showOnConsole) { + console.log('goodReplay started'); + } + const { asyncFlow, adminAsyncFlow } = prepareAsyncFlowTools(zone, { + vowTools, + }); prepareOrchestra(zone); const { when } = vowTools; const hOrch7 = /** @type {Orchestra} */ ( @@ -165,37 +250,107 @@ const testBadReplay = async (t, zone, vowTools, showOnConsole = false) => { }, }; - prepareAsyncFlow(zone, 'AsyncFlow1', guestMethod, { vowTools }); + // `asyncFlow` can be used simply to re-prepare the guest function + // by ignoring the returned wrapper function. If the wrapper function is + // invoked, that would be a *new* activation with a new outcome and + // flow, and would have nothing to do with the existing one. + asyncFlow(zone, 'AsyncFlow1', guestMethod); - const asyncFlow = - /** @type {import('../../src/async-flow/async-flow.js').AsyncFlow} */ ( - zone.makeOnce('asyncFlow', () => Fail`outcomeV expected`) - ); - // eslint-disable-next-line no-unused-vars const outcomeV = /** @type {Vow} */ ( zone.makeOnce('outcomeV', () => Fail`outcomeV expected`) ); hOrch7.resolve('y'); // TODO I shouldn't need to do this. - await asyncFlow.wake(); + await adminAsyncFlow.wakeAll(); t.is(await when(hOrch7.vow()), 'y'); + const flow = adminAsyncFlow.getFlowForOutcomeVow(outcomeV); + t.is(passStyleOf(flow), 'remotable'); + if (showOnConsole) { - console.log('log dump', asyncFlow.dump()); - console.log('badReplay done', await promiseTestDone); + console.log('log dump', flow.dump()); + console.log('goodReplay done', await promiseTestDone); } return promiseTestDone; }; -// /** -// * @param {any} _t -// * @param {import('@agoric/base-zone').Zone} _zone -// * @param {import('@agoric/vow').VowTools} _vowTools -// */ -// const testGoodReplay = async (_t, _zone, _vowTools) => { -// // -// }; +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testAfterDoneReplay = async ( + t, + zone, + vowTools, + showOnConsole = false, +) => { + if (showOnConsole) { + console.log('testAfterDoneReplay started'); + } + const { asyncFlow, adminAsyncFlow } = prepareAsyncFlowTools(zone, { + vowTools, + }); + prepareOrchestra(zone); + const { when } = vowTools; + const hOrch7 = /** @type {Orchestra} */ ( + zone.makeOnce('hOrch7', () => Fail`hOrch7 expected`) + ); + + const { guestMethod } = { + async guestMethod(gOrch7, gP) { + t.is(this, 'context'); + if (showOnConsole) { + console.log('about to await gP'); + } + await gP; + const prod = gOrch7.scale(3); + t.is(prod, 21); + + let gErr; + try { + gOrch7.scale(9n); + } catch (e) { + gErr = e; + } + t.is(gErr.name, 'TypeError'); + + const g2 = gOrch7.vow(); + if (showOnConsole) { + console.log('about to await g2'); + } + await g2; // awaiting a promise that won't be resolved until this turn + if (showOnConsole) { + console.log('I woke up!'); + } + }, + }; + + // `asyncFlow` can be used simply to re-prepare the guest function + // by ignoring the returned wrapper function. If the wrapper function is + // invoked, that would be a *new* activation with a new outcome and + // flow, and would have nothing to do with the existing one. + asyncFlow(zone, 'AsyncFlow1', guestMethod); + + const outcomeV = /** @type {Vow} */ ( + zone.makeOnce('outcomeV', () => Fail`outcomeV expected`) + ); + + hOrch7.resolve('y'); + // TODO I shouldn't need to do this. + await adminAsyncFlow.wakeAll(); + t.is(await when(hOrch7.vow()), 'y'); + + const flow = adminAsyncFlow.getFlowForOutcomeVow(outcomeV); + t.is(passStyleOf(flow), 'remotable'); + + if (showOnConsole) { + console.log('log dump', flow.dump()); + console.log('testAfterDoneReplay done'); + } +}; await test.serial('test heap async-flow', async t => { const zone = makeHeapZone('heapRoot'); @@ -227,8 +382,17 @@ await test.serial('test durable async-flow', async t => { await eventLoopIteration(); - // nextLife(); - // const zone3 = makeDurableZone(getBaggage(), 'durableRoot'); - // const vowTools3 = prepareWatchableVowTools(zone3); - // return testGoodReplay(t, zone3, vowTools3); + nextLife(); + const zone3 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools3 = prepareWatchableVowTools(zone3); + await testGoodReplay(t, zone3, vowTools3); + + await eventLoopIteration(); + + nextLife(); + const zone4 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools4 = prepareWatchableVowTools(zone4); + await testAfterDoneReplay(t, zone4, vowTools4, asyncFlowVerbose()); + + await eventLoopIteration(); });