diff --git a/packages/async-flow/src/async-flow.js b/packages/async-flow/src/async-flow.js index 52b94eeb0e6..2c4fe7e1b5a 100644 --- a/packages/async-flow/src/async-flow.js +++ b/packages/async-flow/src/async-flow.js @@ -8,12 +8,13 @@ import { makeReplayMembrane } from './replay-membrane.js'; import { prepareLogStore } from './log-store.js'; import { prepareWeakBijection } from './weak-bijection.js'; import { makeEphemera } from './ephemera.js'; -import { LogEntryShape } from './type-guards.js'; +import { LogEntryShape, FlowStateShape } from './type-guards.js'; const { defineProperties } = Object; const AsyncFlowIKit = harden({ flow: M.interface('Flow', { + getFlowState: M.call().returns(FlowStateShape), restart: M.call().optional(M.boolean()).returns(), wake: M.call().returns(), getOutcome: M.call().returns(VowShape), @@ -22,7 +23,7 @@ const AsyncFlowIKit = harden({ }), admin: M.interface('FlowAdmin', { reset: M.call().returns(), - done: M.call().returns(), + complete: M.call().returns(), panic: M.call(M.error()).returns(M.not(M.any())), // only throws }), wakeWatcher: PromiseWatcherI, @@ -103,6 +104,42 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { }, { flow: { + /** + * @returns {FlowState} + */ + getFlowState() { + const { state, facets } = this; + const { log, outcomeKit, isDone } = state; + const { flow } = facets; + const eph = tmp.for(flow); + + if (isDone) { + eph.membrane === undefined || + Fail`Done flow must drop membrane ${flow} ${eph.membrane}`; + !failures.has(flow) || + Fail`Done flow must not be in failures ${flow} ${failures.get(flow)}`; + !eagerWakers.has(flow) || + Fail`Done flow must not be in eagerWakers ${flow}`; + !flowForOutcomeVowKey.has(outcomeKit.vow) || + Fail`Done flow must drop flow lookup from vow ${outcomeKit.vow}`; + (log.getIndex() === 0 && log.getLength() === 0) || + Fail`Done flow must empty log ${flow} ${log}`; + return 'Done'; + } + if (failures.has(flow)) { + return 'Failed'; + } + if (eph.membrane === undefined) { + log.getIndex() === 0 || + Fail`Sleeping flow must play from log start ${flow} ${log.getIndex()}`; + return 'Sleeping'; + } + if (log.isReplaying()) { + return 'Replaying'; + } + return 'Running'; + }, + /** * Calls the guest function, either for the initial run or at the * start of a replay. @@ -114,7 +151,9 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { const { activationArgs, log, bijection, outcomeKit } = state; const { flow, admin, wakeWatcher } = facets; - !state.isDone || + const startFlowState = flow.getFlowState(); + + startFlowState !== 'Done' || // separate line so I can set a breakpoint Fail`Cannot restart a done flow ${flow}`; @@ -145,7 +184,12 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { eph.membrane = membrane; const guestArgs = membrane.hostToGuest(activationArgs); - // In case some host promises were settled before the guest makes + const flowState = flow.getFlowState(); + flowState === 'Running' || + flowState === 'Replaying' || + Fail`Restarted flow must be Running or Replaying ${flow}`; + + // In case some host vows were settled before the guest makes // the first call to a host object. membrane.wake(); @@ -183,7 +227,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { outcomeKit.resolver.resolve( membrane.guestToHost(gFulfillment), ); - admin.done(); + admin.complete(); } }, guestReason => { @@ -196,7 +240,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { // so this leave the outcome vow unsettled, as it must. if (bijection.hasGuest(guestResultP)) { outcomeKit.resolver.reject(membrane.guestToHost(guestReason)); - admin.done(); + admin.complete(); } }, ); @@ -255,7 +299,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { state.isDone = false; }, - done() { + complete() { const { state, facets } = this; const { log } = state; const { flow, admin } = facets; @@ -267,6 +311,8 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { flowForOutcomeVowKey.delete(vowishKey(flow.getOutcome())); state.isDone = true; log.dispose(); + flow.getFlowState() === 'Done' || + Fail`Complete flow must be Done ${flow}`; }, panic(fatalProblem) { const { state, facets } = this; @@ -287,6 +333,9 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { log.reset(); bijection.reset(); + flow.getFlowState() === 'Failed' || + Fail`Paniced flow must be Failed ${flow}`; + // This is not an expected throw, so in theory arbitrary chaos // may ensue from throwing it. But at this point // we should have successfully isolated this activation from @@ -314,7 +363,11 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { ); const makeAsyncFlowKit = activationArgs => { const asyncFlowKit = internalMakeAsyncFlowKit(activationArgs); - asyncFlowKit.flow.restart(); + const { flow } = asyncFlowKit; + + const vow = vowishKey(flow.getOutcome()); + flowForOutcomeVowKey.init(vowishKey(vow), flow); + flow.restart(); return asyncFlowKit; }; return harden(makeAsyncFlowKit); @@ -333,9 +386,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => { const wrapperFunc = { [hostFuncName](...args) { const { flow } = makeAsyncFlowKit(args); - const outcomeVow = flow.getOutcome(); - flowForOutcomeVowKey.init(vowishKey(outcomeVow), flow); - return outcomeVow; + return flow.getOutcome(); }, }[hostFuncName]; defineProperties(wrapperFunc, { diff --git a/packages/async-flow/src/type-guards.js b/packages/async-flow/src/type-guards.js index c8ac6365043..65551bd5f89 100644 --- a/packages/async-flow/src/type-guards.js +++ b/packages/async-flow/src/type-guards.js @@ -1,6 +1,14 @@ import { M } from '@endo/patterns'; import { VowShape } from '@agoric/vow'; +export const FlowStateShape = M.or( + 'Running', + 'Sleeping', + 'Replaying', + 'Failed', + 'Done', +); + export const PropertyKeyShape = M.or(M.string(), M.symbol()); export const LogEntryShape = M.or(