Skip to content

Commit

Permalink
Merge branch 'markm-async-flow' into markm-asyncFlow-E
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed May 15, 2024
2 parents 06408e8 + 2ebefd4 commit 174eb1c
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 68 deletions.
5 changes: 1 addition & 4 deletions golang/cosmos/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/armon/go-metrics v0.4.1
github.com/cosmos/cosmos-sdk v0.46.16
github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2
github.com/cosmos/ibc-go/v6 v6.2.1
github.com/cosmos/ibc-go/v6 v6.3.1
github.com/gogo/protobuf v1.3.3
github.com/golang/protobuf v1.5.3
github.com/gorilla/mux v1.8.0
Expand Down Expand Up @@ -192,9 +192,6 @@ replace (
// Pick up an IAVL race fix.
github.com/cosmos/iavl => github.com/cosmos/iavl v0.19.7

// Async version negotiation
github.com/cosmos/ibc-go/v6 => github.com/agoric-labs/ibc-go/v6 v6.2.1-alpha.agoric.3

// use cometbft
// Use our fork at least until post-v0.34.14 is released with
// https://github.com/tendermint/tendermint/issue/6899 resolved.
Expand Down
4 changes: 2 additions & 2 deletions golang/cosmos/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ github.com/agoric-labs/cosmos-sdk v0.46.16-alpha.agoric.2.4 h1:i5IgChQjTyWulV/y5
github.com/agoric-labs/cosmos-sdk v0.46.16-alpha.agoric.2.4/go.mod h1:d7e4h+w7FNBNmE6ysp6duBVuQg67pqMtvsLwpT9ca3E=
github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1 h1:2jvHI/2d+psWAZy6FQ0vXJCHUtfU3ZbbW+pQFL04arQ=
github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/agoric-labs/ibc-go/v6 v6.2.1-alpha.agoric.3 h1:YqvVwK+Lg/ZsuwyVm9UbPs8K55fg00R3Y9KnmaTBdgc=
github.com/agoric-labs/ibc-go/v6 v6.2.1-alpha.agoric.3/go.mod h1:V9NOCRS9RPkSJNJQIPRAjZn/lo2mCAAKOSv3/83ISDY=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -382,6 +380,8 @@ github.com/cosmos/iavl v0.19.7 h1:ij32FaEnwxfEurtK0QKDNhTWFnz6NUmrI5gky/WnoY0=
github.com/cosmos/iavl v0.19.7/go.mod h1:X9PKD3J0iFxdmgNLa7b2LYWdsGd90ToV5cAONApkEPw=
github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 h1:Hz4nkpStoXIHrC77CIEyu2mRiN2qysGEZPFRf0fpv7w=
github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2/go.mod h1:Jo934o/sW7fNxuOa/TjCalSalz+1Fd649eLyANaJx8g=
github.com/cosmos/ibc-go/v6 v6.3.1 h1:/5ur3AsmNW8WuOevfODHlaY5Ze236PBNE3vVo9o3fQA=
github.com/cosmos/ibc-go/v6 v6.3.1/go.mod h1:Dm14j9s094bGyCEE8W4fD+2t8IneHv+cz+80Mvwjr1w=
github.com/cosmos/keyring v1.2.0 h1:8C1lBP9xhImmIabyXW4c3vFjjLiBdGCmfLUfeZlV1Yo=
github.com/cosmos/keyring v1.2.0/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA=
github.com/cosmos/ledger-cosmos-go v0.12.4 h1:drvWt+GJP7Aiw550yeb3ON/zsrgW0jgh5saFCr7pDnw=
Expand Down
5 changes: 3 additions & 2 deletions golang/cosmos/x/vibc/types/ibc_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
channeltypes "github.com/cosmos/ibc-go/v6/modules/core/04-channel/types"
porttypes "github.com/cosmos/ibc-go/v6/modules/core/05-port/types"
host "github.com/cosmos/ibc-go/v6/modules/core/24-host"
ibckeeper "github.com/cosmos/ibc-go/v6/modules/core/keeper"

"github.com/cosmos/ibc-go/v6/modules/core/exported"

Expand All @@ -19,7 +18,9 @@ const (
// asynchronous versions. If it does, then the VM must supply an empty
// version string to indicate that the VM explicitly (possibly async)
// performs the Write* method.
AsyncVersions = ibckeeper.AsyncVersionNegotiation
// This flag is created in anticipation of ibc-go implementing async versions,
// see https://github.com/Agoric/agoric-sdk/issues/9358 for more details.
AsyncVersions = false
)

var (
Expand Down
62 changes: 49 additions & 13 deletions packages/async-flow/src/async-flow.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { annotateError, Fail, makeError, X } from '@endo/errors';
import { annotateError, Fail, makeError, q, X } from '@endo/errors';
import { E } from '@endo/eventual-send';
import { M } from '@endo/patterns';
import { makeScalarWeakMapStore } from '@agoric/store';
import { PromiseWatcherI } from '@agoric/base-zone';
import { prepareVowTools, toPassableCap, VowShape } from '@agoric/vow';
import { makeReplayMembrane } from './replay-membrane.js';
import { prepareLogStore } from './log-store.js';
import { prepareWeakBijection } from './weak-bijection.js';
import { prepareBijection } from './bijection.js';
import { LogEntryShape, FlowStateShape } from './type-guards.js';

const { defineProperties } = Object;
Expand Down Expand Up @@ -42,7 +42,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const {
vowTools = prepareVowTools(outerZone),
makeLogStore = prepareLogStore(outerZone),
makeWeakBijection = prepareWeakBijection(outerZone),
makeBijection = prepareBijection(outerZone),
} = outerOptions;
const { watch, makeVowKit } = vowTools;

Expand Down Expand Up @@ -95,7 +95,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
activationArgs => {
harden(activationArgs);
const log = makeLogStore();
const bijection = makeWeakBijection();
const bijection = makeBijection();

return {
activationArgs, // replay starts by reactivating with these
Expand Down Expand Up @@ -257,13 +257,50 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const { flow } = facets;

const flowState = flow.getFlowState();
if (flowState === 'Done' || flowState === 'Failed') {
return;
}
if (hasMembrane(flow)) {
getMembrane(flow).wake();
} else {
flow.restart();
switch (flowState) {
case 'Done':
case 'Failed': {
return;
}
case 'Running':
case 'Replaying': {
// Safe to call membrane wake for a replaying or running flow
// because it is idempotent. membrane.wake already has reentrancy
// protection. Aside from harmless reentrancy, calling
// membrane.wake won't cause it to do anything that it would
// not have done on its own.
//
// An interesting edge case is that when the guest proceeds
// from a top-level doReturn or doThrow, while we're still in
// the guest turn, if somehow flow.wake were to be called then,
// and if the next thing in the replay log was a `doCall`
// (a future feature), then the `doCall` would call the guest
// while it was still in the middle of a "past" turn. However,
// this cannot happen because `flow` is host-side. For it to
// be called while the guest is active, the membrane's
// `callStack` would not be empty. membrane.wake checks and
// already throws an error in that case.
//
// More important, during a replay, no guest action can actually
// call host functions at all. Rather, the host is fully
// emulated from the log. So this case cannot arise.
//
// This analysis *assumes* that the guest function has no access
// to the flow outside the membrane, i.e., the "closed guest"
// assumption.
getMembrane(flow).wake();
return;
}
case 'Sleeping': {
flow.restart();
return;
}
default: {
// Should be a at-ts-expect-error that this case is unreachable
// which TS clearly knows anyway because it thinks the following
// `flowState` variable in this context has type `never`.
throw Fail`unexpected flowState ${q(flowState)}`;
}
}
},
getOutcome() {
Expand All @@ -289,6 +326,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const { state, facets } = this;
const { bijection, log } = state;
const { flow } = facets;
!state.isDone || Fail`Cannot reset a done flow`;

if (failures.has(flow)) {
failures.delete(flow);
Expand All @@ -299,8 +337,6 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
}
log.reset();
bijection.reset();

state.isDone = false;
},
complete() {
const { state, facets } = this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Far } from '@endo/pass-style';
import { toPassableCap } from '@agoric/vow';
import { makeEphemera } from './ephemera.js';

const WeakBijectionI = M.interface('WeakBijection', {
const BijectionI = M.interface('Bijection', {
reset: M.call().returns(),
init: M.call(M.any(), M.any()).returns(),
hasGuest: M.call(M.any()).returns(M.boolean()),
Expand All @@ -22,10 +22,15 @@ const WeakBijectionI = M.interface('WeakBijection', {
* @param {string} name
*/
const makeVowishStore = name => {
// The vowMap would be needed if we supported enumeration,
// in order to reconstruct the original keys.
// const vowMap = new Map();
const map = new WeakMap();
// This internal map could be (and was) a WeakMap. But there are various ways
// in which a WeakMap is more expensive than a Map. The main advantage is
// that a WeakMap can drop entries whose keys are not otherwise retained.
// But async-flow only uses a bijection together with a log-store that happens
// to durably retain all the host-side keys of the associated bijection, so
// this additional feature of the bijection is irrelevant. When the bijection
// is reset or revived in a new incarnation, these vowishStores will be gone
// anyway, dropping all the guest-side objects.
const map = new Map();

return Far(name, {
init: (k, v) => {
Expand All @@ -51,13 +56,13 @@ const makeVowishStore = name => {
/**
* @param {Zone} zone
*/
export const prepareWeakBijection = zone => {
/** @type {Ephemera<WeakBijection, VowishStore>} */
export const prepareBijection = zone => {
/** @type {Ephemera<Bijection, VowishStore>} */
const g2h = makeEphemera(() => makeVowishStore('guestToHost'));
/** @type {Ephemera<WeakBijection, VowishStore>} */
/** @type {Ephemera<Bijection, VowishStore>} */
const h2g = makeEphemera(() => makeVowishStore('hostToGuest'));

return zone.exoClass('WeakBijection', WeakBijectionI, () => ({}), {
return zone.exoClass('Bijection', BijectionI, () => ({}), {
reset() {
const { self } = this;

Expand Down Expand Up @@ -120,8 +125,8 @@ export const prepareWeakBijection = zone => {
},
});
};
harden(prepareWeakBijection);
harden(prepareBijection);

/**
* @typedef {ReturnType<ReturnType<prepareWeakBijection>>} WeakBijection
* @typedef {ReturnType<ReturnType<prepareBijection>>} Bijection
*/
4 changes: 4 additions & 0 deletions packages/async-flow/src/log-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ export const prepareLogStore = zone => {
reset() {
const { self } = this;
tmp.resetFor(self);

// TODO: Should we resolve replayDoneKit here, in case we're
// transitioning to a Failed state, so that any pending watchers
// can exit?
},
dispose() {
const { state, self } = this;
Expand Down
40 changes: 22 additions & 18 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { fromEntries, defineProperties, assign } = Object;

/**
* @param {LogStore} log
* @param {WeakBijection} bijection
* @param {Bijection} bijection
* @param {VowTools} vowTools
* @param {(vowish: Promise | Vow) => void} watchWake
* @param {(problem: Error) => never} panic
Expand Down Expand Up @@ -41,8 +41,8 @@ export const makeReplayMembrane = (
const doFulfill = (hostVow, hostFulfillment) => {
const guestPromise = hostToGuest(hostVow);
const status = guestPromiseMap.get(guestPromise);
if (status === 'settled') {
return;
if (!status || status === 'settled') {
Fail`doFulfill should only be called on a registered unresolved promise`;
}
const guestFulfillment = hostToGuest(hostFulfillment);
status.resolve(guestFulfillment);
Expand All @@ -60,8 +60,8 @@ export const makeReplayMembrane = (
const doReject = (hostVow, hostReason) => {
const guestPromise = hostToGuest(hostVow);
const status = guestPromiseMap.get(guestPromise);
if (status === 'settled') {
return;
if (!status || status === 'settled') {
Fail`doReject should only be called on a registered unresolved promise`;
}
const guestReason = hostToGuest(hostReason);
status.reject(guestReason);
Expand Down Expand Up @@ -183,14 +183,18 @@ export const makeReplayMembrane = (
throw panic(fatalError);
}

if (outcome.kind === 'return') {
return outcome.result;
} else {
outcome.kind === 'throw' ||
switch (outcome.kind) {
case 'return': {
return outcome.result;
}
case 'throw': {
throw outcome.problem;
}
default: {
// @ts-expect-error TS correctly knows this case would be outside
// the type. But that's what we want to check.
Fail`unexpected outcome kind ${q(outcome.kind)}`;
throw outcome.problem;
throw Fail`unexpected outcome kind ${q(outcome.kind)}`;
}
}
};

Expand Down Expand Up @@ -251,7 +255,7 @@ export const makeReplayMembrane = (
};
if (optVerb) {
defineProperties(guestMethod, {
name: { value: String(optVerb) },
name: { value: String(hRem[optVerb].name || optVerb) },
length: { value: Number(hRem[optVerb].length || 0) },
});
} else {
Expand Down Expand Up @@ -297,21 +301,21 @@ export const makeReplayMembrane = (
void when(
hVow,
async hostFulfillment => {
await log.promiseReplayDone();
if (guestPromiseMap.get(promise) !== 'settled') {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doFulfill', hVow, hostFulfillment]);
log.pushEntry(entry);
interpretOne(topDispatch, entry);
interpretOne(topDispatch, entry); // does its own panic
}
},
async hostReason => {
await log.promiseReplayDone();
if (guestPromiseMap.get(promise) !== 'settled') {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doReject', hVow, hostReason]);
log.pushEntry(entry);
interpretOne(topDispatch, entry);
interpretOne(topDispatch, entry); // does its own panic
}
},
);
Expand Down
4 changes: 2 additions & 2 deletions packages/async-flow/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* @import {Zone} from '@agoric/base-zone'
* @import {Vow, VowTools} from '@agoric/vow'
* @import {LogStore} from './log-store.js'
* @import {WeakBijection} from './weak-bijection.js'
* @import {Bijection} from './bijection.js'
* @import {ReplayMembrane} from './replay-membrane.js'
*/

Expand Down Expand Up @@ -47,7 +47,7 @@
* @typedef {object} PreparationOptions
* @property {VowTools} [vowTools]
* @property {() => LogStore} [makeLogStore]
* @property {() => WeakBijection} [makeWeakBijection]
* @property {() => Bijection} [makeBijection]
*/

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import { makeHeapZone } from '@agoric/zone/heap.js';
import { makeVirtualZone } from '@agoric/zone/virtual.js';
import { makeDurableZone } from '@agoric/zone/durable.js';

import { prepareWeakBijection } from '../src/weak-bijection.js';
import { prepareBijection } from '../src/bijection.js';

/**
* @param {any} t
* @param {Zone} zone
*/
const testBijection = (t, zone) => {
const { makeVowKit } = prepareVowTools(zone);
const makeBijection = prepareWeakBijection(zone);
const makeBijection = prepareBijection(zone);
const bij = zone.makeOnce('bij', makeBijection);

const h1 = zone.exo('h1', undefined, {});
Expand Down
4 changes: 2 additions & 2 deletions packages/async-flow/test/convert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { makeVirtualZone } from '@agoric/zone/virtual.js';
import { makeDurableZone } from '@agoric/zone/durable.js';

import { makeConvertKit } from '../src/convert.js';
import { prepareWeakBijection } from '../src/weak-bijection.js';
import { prepareBijection } from '../src/bijection.js';

/**
* @param {any} t
Expand All @@ -25,7 +25,7 @@ import { prepareWeakBijection } from '../src/weak-bijection.js';
*/
const testConvert = (t, zone, showOnConsole = false) => {
const { makeVowKit } = prepareVowTools(zone);
const makeBijection = prepareWeakBijection(zone);
const makeBijection = prepareBijection(zone);
const bij = zone.makeOnce('bij', makeBijection);

const makeGuestForHostRemotable = hRem => {
Expand Down
4 changes: 2 additions & 2 deletions packages/async-flow/test/equate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { makeHeapZone } from '@agoric/zone/heap.js';
import { makeVirtualZone } from '@agoric/zone/virtual.js';
import { makeDurableZone } from '@agoric/zone/durable.js';

import { prepareWeakBijection } from '../src/weak-bijection.js';
import { prepareBijection } from '../src/bijection.js';
import { makeEquate } from '../src/equate.js';

/**
Expand All @@ -25,7 +25,7 @@ import { makeEquate } from '../src/equate.js';
*/
const testEquate = (t, zone, showOnConsole = false) => {
const { makeVowKit } = prepareVowTools(zone);
const makeBijection = prepareWeakBijection(zone);
const makeBijection = prepareBijection(zone);
const bij = zone.makeOnce('bij', makeBijection);

t.throws(() => zone.makeOnce('equate', () => makeEquate(bij)), {
Expand Down
Loading

0 comments on commit 174eb1c

Please sign in to comment.