Skip to content

Commit

Permalink
feat(asyncFlow): E support
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed Jun 9, 2024
1 parent b1796b5 commit a461f24
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 22 deletions.
127 changes: 118 additions & 9 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { makeConvertKit } from './convert.js';

/**
* @import {PromiseKit} from '@endo/promise-kit'
* @import {PassableCap} from '@endo/pass-style'
* @import {VowKit} from '@agoric/vow'
*/

const { fromEntries, defineProperties, assign } = Object;
Expand All @@ -26,7 +28,7 @@ export const makeReplayMembrane = (
watchWake,
panic,
) => {
const { when } = vowTools;
const { when, makeVowKit } = vowTools;

const equate = makeEquate(bijection);

Expand Down Expand Up @@ -208,12 +210,111 @@ export const makeReplayMembrane = (

// //////////////// Eventual Send ////////////////////////////////////////////

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
* @param {number} callIndex
* @param {VowKit} hostResultKit
* @param {Promise} guestReturnedP
* @returns {Outcome}
*/
const performSend = (
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
) => {
const { vow, resolver } = hostResultKit;
try {
const hostPromise = optVerb
? E(hostTarget)[optVerb](...hostArgs)
: E(hostTarget)(...hostArgs);
resolver.resolve(hostPromise); // TODO does this always work?
} catch (hostProblem) {
throw Fail`internal: eventual send synchrously failed ${hostProblem}`;
}
try {
const entry = harden(['doReturn', callIndex, vow]);
log.pushEntry(entry);
const guestPromise = makeGuestForHostVow(vow, guestReturnedP);
// Note that `guestPromise` is not registered in the bijection since
// guestReturnedP is already the guest for vow. Rather, the handler
// returns guestPromise to resolve guestReturnedP to guestPromise.
const { kind } = doReturn(callIndex, vow);
kind === 'return' || Fail`internal: "return" kind expected ${q(kind)}`;
return harden({
kind: 'return',
result: guestPromise,
});
} catch (problem) {
throw panic(problem);
}
};

const guestHandler = harden({
applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) {
if (optVerb === undefined) {
throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
} else {
throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
// TODO FIX BUG this is not quite right. When guestResultP is returned
// as the resolution of guestResultP, it create a visious cycle error.
const hostResultKit = makeVowKit();
bijection.init(guestReturnedP, hostResultKit.vow);
/** @type {Outcome} */
let outcome;
try {
const guestEntry = harden([
'checkSend',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
equate(
guestEntry,
entry,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
outcome = /** @type {Outcome} */ (nestInterpreter(callIndex));
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
nestInterpreter(callIndex);
outcome = performSend(
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
);
}
} catch (fatalError) {
throw panic(fatalError);
}

switch (outcome.kind) {
case 'return': {
return outcome.result;
}
case 'throw': {
throw outcome.problem;
}
default: {
// @ts-expect-error TS correctly knows this case would be outside
// the type. But that's what we want to check.
throw Panic`unexpected outcome kind ${q(outcome.kind)}`;
}
}
},
applyFunction(guestTarget, guestArgs, guestReturnedP) {
Expand Down Expand Up @@ -315,11 +416,19 @@ export const makeReplayMembrane = (

/**
* @param {Vow} hVow
* @param {Promise} [promiseKey]
* If provided, use this promise as the key in the guestPromiseMap
* rather than the returned promise. This only happens when the
* promiseKey ends up forwarded to the returned promise anyway, so
* associating it with this resolve/reject pair is not incorrect.
* It is needed when `promiseKey` is also entered into the bijection
* paired with hVow.
* @returns {Promise}
*/
const makeGuestForHostVow = hVow => {
const makeGuestForHostVow = (hVow, promiseKey = undefined) => {
const { promise, resolve, reject } = makeGuestPromiseKit();
guestPromiseMap.set(promise, harden({ resolve, reject }));
promiseKey ??= promise;
guestPromiseMap.set(promiseKey, harden({ resolve, reject }));

watchWake(hVow);

Expand All @@ -343,7 +452,7 @@ export const makeReplayMembrane = (
hVow,
async hostFulfillment => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doFulfill', hVow, hostFulfillment]);
log.pushEntry(entry);
Expand All @@ -358,7 +467,7 @@ export const makeReplayMembrane = (
},
async hostReason => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doReject', hVow, hostReason]);
log.pushEntry(entry);
Expand Down
14 changes: 7 additions & 7 deletions packages/async-flow/src/type-guards.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ export const LogEntryShape = M.or(
M.arrayOf(M.any()),
M.number(),
],
// [
// 'checkSend',
// M.or(M.remotable('host target'), VowShape),
// M.opt(PropertyKeyShape),
// M.arrayOf(M.any()),
// M.number(),
// ],
[
'checkSend',
M.or(M.remotable('host target'), VowShape),
M.opt(PropertyKeyShape),
M.arrayOf(M.any()),
M.number(),
],
// ['checkReturn', M.number(), M.any()],
// ['checkThrow', M.number(), M.any()],
);
6 changes: 6 additions & 0 deletions packages/async-flow/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@
* optVerb: PropertyKey|undefined,
* args: Host[],
* callIndex: number
* ] | [
* op: 'checkSend',
* target: Host,
* optVerb: PropertyKey|undefined,
* args: Host[],
* callIndex: number
* ]} LogEntry
*/

Expand Down
108 changes: 102 additions & 6 deletions packages/async-flow/test/replay-membrane-eventual.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from './prepare-test-env-ava.js';

import { Fail } from '@endo/errors';
import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js';
import { prepareVowTools } from '@agoric/vow';
import { E } from '@endo/eventual-send';
// import E from '@agoric/vow/src/E.js';
Expand Down Expand Up @@ -39,15 +40,19 @@ const preparePingee = zone =>
*/
const testFirstPlay = async (t, zone) => {
const vowTools = prepareVowTools(zone);
const { makeVowKit } = vowTools;
const makeLogStore = prepareLogStore(zone);
const makeBijection = prepareBijection(zone);
const makePingee = preparePingee(zone);
const { vow: v1, resolver: r1 } = zone.makeOnce('v1', () => makeVowKit());
const { vow: _v2, resolver: _r2 } = zone.makeOnce('v2', () => makeVowKit());

const log = zone.makeOnce('log', () => makeLogStore());
const bij = zone.makeOnce('bij', makeBijection);

const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic);

const p1 = mem.hostToGuest(v1);
t.deepEqual(log.dump(), []);

/** @type {Pingee} */
Expand All @@ -56,18 +61,105 @@ const testFirstPlay = async (t, zone) => {
const guestPingee = mem.hostToGuest(pingee);
t.deepEqual(log.dump(), []);

const pingTestSendResult = t.throwsAsync(() => E(guestPingee).ping('send'), {
message:
'panic over "[Error: guest eventual send not yet supported: \\"[Alleged: Pingee guest wrapper]\\".ping([\\"send\\"]) -> \\"[Promise]\\"]"',
});
const p = E(guestPingee).ping('send');

guestPingee.ping('call');

t.is(await p, undefined);
const dump = log.dump();
const v3 = dump[3][2];
t.deepEqual(dump, [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
]);

r1.resolve('x');
t.is(await p1, 'x');

t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);
};

/**
* @param {any} t
* @param {Zone} zone
*/
const testReplay = async (t, zone) => {
const vowTools = prepareVowTools(zone);
prepareLogStore(zone);
prepareBijection(zone);
preparePingee(zone);
const { vow: v1 } = zone.makeOnce('v1', () => Fail`need v1`);
const { vow: v2, resolver: r2 } = zone.makeOnce('v2', () => Fail`need v2`);

const log = /** @type {LogStore} */ (
zone.makeOnce('log', () => Fail`need log`)
);
const bij = /** @type {Bijection} */ (
zone.makeOnce('bij', () => Fail`need bij`)
);

const pingee = zone.makeOnce('pingee', () => Fail`need pingee`);

const dump = log.dump();
const v3 = dump[3][2];
t.deepEqual(dump, [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);

const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic);
t.true(log.isReplaying());
t.is(log.getIndex(), 0);

const guestPingee = mem.hostToGuest(pingee);
const p2 = mem.hostToGuest(v2);
// @ts-expect-error TS doesn't know that r2 is a resolver
r2.resolve('y');
await eventLoopIteration();

const p1 = mem.hostToGuest(v1);
mem.wake();
t.true(log.isReplaying());
t.is(log.getIndex(), 0);
t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
]);

E(guestPingee).ping('send');

guestPingee.ping('call');

await pingTestSendResult;
t.is(await p1, 'x');
t.is(await p2, 'y');
t.false(log.isReplaying());

t.deepEqual(log.dump(), [
['checkCall', pingee, 'ping', ['call'], 0],
['doReturn', 0, undefined],
['checkSend', pingee, 'ping', ['send'], 2],
['doReturn', 2, v3],
['doFulfill', v3, undefined],
['doFulfill', v1, 'x'],
['doFulfill', v2, 'y'],
]);
};

Expand All @@ -87,5 +179,9 @@ test.serial('test durable replay-membrane settlement', async t => {

nextLife();
const zone1 = makeDurableZone(getBaggage(), 'durableRoot');
return testFirstPlay(t, zone1);
await testFirstPlay(t, zone1);

nextLife();
const zone3 = makeDurableZone(getBaggage(), 'durableRoot');
return testReplay(t, zone3);
});

0 comments on commit a461f24

Please sign in to comment.