Skip to content

Commit

Permalink
fixup! close
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed Mar 22, 2024
1 parent b221443 commit 9774ce3
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 158 deletions.
244 changes: 132 additions & 112 deletions packages/zone/src/async-flow/async-flow.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Fail } from '@endo/errors';
import { Fail, q } from '@endo/errors';
import { E } from '@endo/far';
import { M } from '@endo/patterns';
import { VowShape } from '@agoric/vow';
import { PromiseWatcherI } from '@agoric/vow/src/watch-promise.js';
import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js';
import { makeReplayMembrane } from './replay-membrane.js';
import { prepareLogStore } from './log-store.js';
import { prepareWeakBijection } from './weak-bijection.js';
import { vowishKey, prepareWeakBijection } from './weak-bijection.js';
import { makeEphemera } from './ephemera.js';
import { LogEntryShape } from './type-guards.js';

// const { defineProperties } = Object;
const { defineProperties } = Object;
const { apply } = Reflect;

const AsyncFlowIKit = harden({
Expand All @@ -32,6 +32,7 @@ const AsyncFlowIKit = harden({
const AdminAsyncFlowI = M.interface('AsyncFlowAdmin', {
getFailures: M.call().returns(M.mapOf(M.remotable('asyncFlow'), M.error())),
wakeAll: M.call().returns(),
getFlowForOutcomeVow: M.call(VowShape).returns(M.opt(M.remotable('flow'))),
});

/**
Expand Down Expand Up @@ -61,27 +62,33 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
), // initialized by restart
}));

const adminAsyncFlow = outerZone.exo('AdminAsyncFlow', AdminAsyncFlowI, {
getFailures() {
return failures.snapshot();
},
wakeAll() {
for (const asyncFlow of eagerWakers.keys()) {
asyncFlow.wake();
}
},
/**
* So we can give out wrapper functions easily and recover flow objects
* for their activations later.
*/
const flowForOutcomeVowKey = outerZone.mapStore('flowForOutcomeVow', {
keyShape: M.remotable('vowishKey'),
valueShape: M.remotable('asyncFlow'),
});

/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {string} tag
* @param {GuestAsyncFunc} guestAsyncFunc
* @param {GuestAsyncFunc} [optGuestAsyncFunc]
* @param {{ startEager?: boolean }} [options]
*/
const prepareAsyncFlowKit = (zone, tag, guestAsyncFunc, options = {}) => {
const { startEager = true } = options;

const makeAsyncFlowKit = zone.exoClassKit(
const prepareAsyncFlowKit = (
zone,
tag,
optGuestAsyncFunc = undefined,
options = {},
) => {
const {
// May change default to false, once instances reliably wake up
startEager = true,
} = options;

const internalMakeAsyncFlowKit = zone.exoClassKit(
tag,
AsyncFlowIKit,
(activationThis, activationArgs) => {
Expand All @@ -101,7 +108,12 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
},
{
flow: {
restart(func = guestAsyncFunc, eager = startEager) {
restart(func = optGuestAsyncFunc, eager = startEager) {
if (func === undefined) {
throw Fail`Function must either be in prepareAsyncFlowKit or restart args: ${q(
tag,
)}`;
}
const { state, facets } = this;
const {
activationThis,
Expand All @@ -112,7 +124,9 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
} = state;
const { flow, admin, wakeWatcher } = facets;

!state.isDone || Fail`Cannot restart a done flow ${flow}`;
!state.isDone ||
// separate line so I can set a breakpoint
Fail`Cannot restart a done flow ${flow}`;

admin.reset();
if (eager) {
Expand Down Expand Up @@ -143,7 +157,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {

// In case some host promises were settled before the guest makes
// the first call to a host object.
void membrane.wake();
membrane.wake();

// We do *not* call the guesAsyncFunc by having the membrane make
// a host wrapper for the function. Rather, we special case this
Expand All @@ -166,29 +180,24 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
// in further turns by `await`ing (or otherwise registering)
// on host vows turned into guest promises, and by calling
// the guest presence of other host objects.
const guestResultPStillGood = () => {
if (bijection.hasGuest(guestResultP)) {
bijection.has(guestResultP, outcomeKit.vow) ||
Fail`unexpected vow ${guestResultP} -> ${bijection.guestToHost(
guestResultP,
)} vs ${outcomeKit.vow}`;
return true;
} else {
return false;
}
};
//
// `bijection.hasGuest(guestResultP)` can be false in a delayed
// guest - to - host setlling from a previous run.
// In that case, the bijection was reset and all guest caps
// created in the previous run were unregistered,
// including `guestResultP`.
void E.when(
guestResultP,
gFulfillment => {
if (guestResultPStillGood()) {
if (bijection.hasGuest(guestResultP)) {
outcomeKit.resolver.resolve(
membrane.guestToHost(gFulfillment),
);
admin.done();
}
},
guestReason => {
if (guestResultPStillGood()) {
if (bijection.hasGuest(guestResultP)) {
outcomeKit.resolver.reject(membrane.guestToHost(guestReason));
admin.done();
}
Expand All @@ -205,14 +214,18 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
if (state.isDone) {
return;
}
void tmp.for(flow).membrane.wake();
if (tmp.for(flow).membrane) {
tmp.for(flow).membrane.wake();
} else {
flow.restart();
}
},
getOutcome() {
const { state, facets } = this;
const { outcomeKit } = state;
const { flow } = facets;

void flow.wake();
flow.wake();
return outcomeKit.vow;
},
dump() {
Expand All @@ -238,7 +251,11 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
failures.delete(flow);
}
if (eagerWakers.has(flow)) {
eagerWakers.delete(flow);
// For now, once an eagerWaker, always an eagerWaker
// eagerWakers.delete(flow);
}
if (tmp.for(flow).membrane) {
tmp.for(flow).membrane.stop();
}
tmp.resetFor(flow);
log.reset();
Expand All @@ -248,41 +265,110 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
},
done() {
const { state, facets } = this;
const { log } = state;
const { admin } = facets;

admin.reset();
state.isDone = true;
log.dispose();
},
panic(fatalProblem) {
const { facets } = this;
const { state, facets } = this;
const { bijection, log } = state;
const { flow } = facets;

if (failures.has(flow)) {
failures.set(flow, fatalProblem);
} else {
failures.init(flow, fatalProblem);
}
throw fatalProblem;

if (tmp.for(flow).membrane) {
tmp.for(flow).membrane.stop();
}
tmp.resetFor(flow);
log.reset();
bijection.reset();

// This is a non-sensical return value, so arbitrary chaos
// may ensue from returning it. But at this point
// we should have successfully isolated this activation from
// having any observable effects on the host, aside from
// console logging and
// resource exhaustion, including infinite loops
return undefined;
},
},
wakeWatcher: {
onFulfilled(_fulfillment) {
const { facets } = this;
void facets.flow.wake();
facets.flow.wake();
},
onRejected(_fulfillment) {
const { facets } = this;
void facets.flow.wake();
facets.flow.wake();
},
},
},
);
const makeAsyncFlowKit = (activationThis, activationArgs) => {
const asyncFlowKit = internalMakeAsyncFlowKit(
activationThis,
activationArgs,
);
asyncFlowKit.flow.restart();
return asyncFlowKit;
};
return harden(makeAsyncFlowKit);
};

/**
* @param {import('@agoric/base-zone').Zone} zone
* @param {string} tag
* @param {GuestAsyncFunc} guestFunc
* @param {{ startEager?: boolean }} [options]
* @returns {HostAsyncFuncWrapper}
*/
const asyncFlow = (zone, tag, guestFunc, options = undefined) => {
const makeAsyncFlowKit = prepareAsyncFlowKit(zone, tag, guestFunc, options);
const hostFuncName = `${guestFunc.name || 'anon'}_hostWrapper`;
const wrapperFunc = {
[hostFuncName](...args) {
const { flow } = makeAsyncFlowKit(this, args);
const outcomeVow = flow.getOutcome();
flowForOutcomeVowKey.init(vowishKey(outcomeVow), flow);
return outcomeVow;
},
}[hostFuncName];
defineProperties(wrapperFunc, {
length: { value: guestFunc.length },
});
return harden(wrapperFunc);
};

const adminAsyncFlow = outerZone.exo('AdminAsyncFlow', AdminAsyncFlowI, {
getFailures() {
return failures.snapshot();
},
wakeAll() {
// [...stuff.keys()] in order to snapshot before iterating
const flowsToWake = [...failures.keys(), ...eagerWakers.keys()];
for (const flow of flowsToWake) {
flow.wake();
}
},
getFlowForOutcomeVow(outcomeVow) {
return flowForOutcomeVowKey.get(vowishKey(outcomeVow));
},
});

// Cannot call this until everything is prepared
// adminAsyncFlow.wakeAll();

return harden({
adminAsyncFlow,
prepareAsyncFlowKit,
asyncFlow,
adminAsyncFlow,
});
};
harden(prepareAsyncFlowTools);
Expand All @@ -296,79 +382,13 @@ harden(prepareAsyncFlowTools);
*/

/**
* @typedef {ReturnType<ReturnType<AsyncFlowTools['prepareAsyncFlowKit']>>} AsyncFlowKit
* @typedef {ReturnType<AsyncFlowTools['prepareAsyncFlowKit']>} MakeAsyncFlowKit
*/

/**
* @typedef {AsyncFlowKit['flow']} AsyncFlow
* @typedef {ReturnType<MakeAsyncFlowKit>} AsyncFlowKit
*/

// /**
// * @param {import('@agoric/base-zone').Zone} zone
// * @param {string} tag
// * @param {GuestAsyncFunc} guestAsyncFunc
// * @param {PreparationOptions} [options]
// */
// export const prepareAsyncFlow = (
// zone,
// tag,
// guestAsyncFunc,
// options = undefined,
// ) => {
// const makeAsyncFlowKit = prepareAsyncFlowKit(
// zone,
// tag,
// guestAsyncFunc,
// options,
// );

// const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`;
// const makeAsyncFlow = {
// [hostFuncName](...args) {
// const { flow } = makeAsyncFlowKit(this, args);
// return flow;
// },
// }[hostFuncName];

// defineProperties(makeAsyncFlow, {
// length: { value: guestAsyncFunc.length },
// });

// return harden(makeAsyncFlow);
// };
// harden(prepareAsyncFlow);

// /**
// * @param {import('@agoric/base-zone').Zone} zone
// * @param {string} tag
// * @param {GuestAsyncFunc} guestAsyncFunc
// * @param {PreparationOptions} [options]
// */
// export const prepareAsyncFlowFunc = (
// zone,
// tag,
// guestAsyncFunc,
// options = undefined,
// ) => {
// const makeAsyncFlowKit = prepareAsyncFlowKit(
// zone,
// tag,
// guestAsyncFunc,
// options,
// );

// const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`;
// const makeAsyncFlow = {
// [hostFuncName](...args) {
// const { flow } = makeAsyncFlowKit(this, args);
// return flow.getOutcome();
// },
// }[hostFuncName];

// defineProperties(makeAsyncFlow, {
// length: { value: guestAsyncFunc.length },
// });

// return harden(makeAsyncFlow);
// };
// harden(prepareAsyncFlow);
/**
* @typedef {AsyncFlowKit['flow']} AsyncFlow
*/
4 changes: 3 additions & 1 deletion packages/zone/src/async-flow/equate.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ export const makeEquate = bijection => {

const innerEquate = (g, h) => {
if (!isObject(g)) {
is(g, h) || Fail`unequal primitives ${g} vs ${h}`;
is(g, h) ||
// separate line so I can set a breakpoint
Fail`unequal primitives ${g} vs ${h}`;
return;
}
if (bijection.has(g, h)) {
Expand Down
Loading

0 comments on commit 9774ce3

Please sign in to comment.