Skip to content

Commit

Permalink
Merge branch 'markm-async-flow' into markm-zonify-vowTools
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed May 14, 2024
2 parents 1866f56 + 7373149 commit 2d32d1e
Show file tree
Hide file tree
Showing 8 changed files with 703 additions and 339 deletions.
46 changes: 23 additions & 23 deletions packages/async-flow/src/async-flow.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { annotateError, Fail, makeError, X } from '@endo/errors';
import { E } from '@endo/eventual-send';
import { M } from '@endo/patterns';
import { makeScalarWeakMapStore } from '@agoric/store';
import { PromiseWatcherI } from '@agoric/base-zone';
import { prepareVowTools, toPassableCap, VowShape } from '@agoric/vow';
import { makeReplayMembrane } from './replay-membrane.js';
import { prepareLogStore } from './log-store.js';
import { prepareWeakBijection } from './weak-bijection.js';
import { makeEphemera } from './ephemera.js';
import { LogEntryShape, FlowStateShape } from './type-guards.js';

const { defineProperties } = Object;
Expand Down Expand Up @@ -55,11 +55,16 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
keyShape: M.remotable('flow'), // flowState !== 'Done'
});

const tmp = makeEphemera(() => ({
membrane: /** @type {ReplayMembrane} */ (
/** @type {unknown} */ (undefined)
), // initialized by restart
}));
/** @type WeakMapStore<AsyncFlow, ReplayMembrane> */
const membraneMap = makeScalarWeakMapStore('membraneFor', {
keyShape: M.remotable('flow'),
valueShape: M.remotable('membrane'),
});

const hasMembrane = flow => membraneMap.has(flow);
const getMembrane = flow => membraneMap.get(flow);
const initMembrane = (flow, membrane) => membraneMap.init(flow, membrane);
const deleteMembrane = flow => membraneMap.delete(flow);

/**
* So we can give out wrapper functions easily and recover flow objects
Expand Down Expand Up @@ -109,11 +114,10 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const { state, facets } = this;
const { log, outcomeKit, isDone } = state;
const { flow } = facets;
const eph = tmp.for(flow);

if (isDone) {
eph.membrane === undefined ||
Fail`Done flow must drop membrane ${flow} ${eph.membrane}`;
!hasMembrane(flow) ||
Fail`Done flow must drop membrane ${flow} ${getMembrane(flow)}`;
!failures.has(flow) ||
Fail`Done flow must not be in failures ${flow} ${failures.get(flow)}`;
!eagerWakers.has(flow) ||
Expand All @@ -127,7 +131,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
if (failures.has(flow)) {
return 'Failed';
}
if (eph.membrane === undefined) {
if (!hasMembrane(flow)) {
log.getIndex() === 0 ||
Fail`Sleeping flow must play from log start ${flow} ${log.getIndex()}`;
return 'Sleeping';
Expand Down Expand Up @@ -178,8 +182,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
wakeWatch,
panic,
);
const eph = tmp.for(flow);
eph.membrane = membrane;
initMembrane(flow, membrane);
const guestArgs = membrane.hostToGuest(activationArgs);

const flowState = flow.getFlowState();
Expand Down Expand Up @@ -252,14 +255,13 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
wake() {
const { facets } = this;
const { flow } = facets;
const eph = tmp.for(flow);

const flowState = flow.getFlowState();
if (flowState === 'Done' || flowState === 'Failed') {
return;
}
if (eph.membrane) {
eph.membrane.wake();
if (hasMembrane(flow)) {
getMembrane(flow).wake();
} else {
flow.restart();
}
Expand Down Expand Up @@ -287,15 +289,14 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const { state, facets } = this;
const { bijection, log } = state;
const { flow } = facets;
const eph = tmp.for(flow);

if (failures.has(flow)) {
failures.delete(flow);
}
if (eph.membrane) {
eph.membrane.stop();
if (hasMembrane(flow)) {
getMembrane(flow).stop();
deleteMembrane(flow);
}
tmp.resetFor(flow);
log.reset();
bijection.reset();

Expand All @@ -320,7 +321,6 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
const { state, facets } = this;
const { bijection, log } = state;
const { flow } = facets;
const eph = tmp.for(flow);

if (failures.has(flow)) {
const prevErr = failures.get(flow);
Expand All @@ -334,10 +334,10 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
failures.init(flow, fatalProblem);
}

if (eph.membrane) {
eph.membrane.stop();
if (hasMembrane(flow)) {
getMembrane(flow).stop();
deleteMembrane(flow);
}
tmp.resetFor(flow);
log.reset();
bijection.reset();

Expand Down
25 changes: 20 additions & 5 deletions packages/orchestration/src/examples/stakeAtom.contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/**
* @file Example contract that uses orchestration
*/
import { makeTracer } from '@agoric/internal';
import { makeTracer, StorageNodeShape } from '@agoric/internal';
import { makeDurableZone } from '@agoric/zone/durable.js';
import { V as E } from '@agoric/vow/vat.js';
import { M } from '@endo/patterns';
Expand All @@ -13,9 +13,20 @@ const trace = makeTracer('StakeAtom');
/**
* @import { Baggage } from '@agoric/vat-data';
* @import { IBCConnectionID } from '@agoric/vats';
* @import { TimerService } from '@agoric/time';
* @import { ICQConnection, OrchestrationService } from '../types.js';
*/

export const meta = harden({
privateArgsShape: {
orchestration: M.remotable('orchestration'),
storageNode: StorageNodeShape,
marshaller: M.remotable('Marshaller'),
timer: M.remotable('TimerService'),
},
});
export const privateArgsShape = meta.privateArgsShape;

/**
* @typedef {{
* hostConnectionId: IBCConnectionID;
Expand All @@ -31,14 +42,15 @@ const trace = makeTracer('StakeAtom');
* orchestration: OrchestrationService;
* storageNode: StorageNode;
* marshaller: Marshaller;
* timer: TimerService;
* }} privateArgs
* @param {Baggage} baggage
*/
export const start = async (zcf, privateArgs, baggage) => {
// TODO #9063 this roughly matches what we'll get from Chain<C>.getChainInfo()
const { hostConnectionId, controllerConnectionId, bondDenom } =
zcf.getTerms();
const { orchestration, marshaller, storageNode } = privateArgs;
const { orchestration, marshaller, storageNode, timer } = privateArgs;

const zone = makeDurableZone(baggage);

Expand All @@ -63,11 +75,14 @@ export const start = async (zcf, privateArgs, baggage) => {
const accountAddress = await E(account).getAddress();
trace('account address', accountAddress);
const { holder, invitationMakers } = makeStakingAccountKit(
account,
storageNode,
accountAddress,
icqConnection,
bondDenom,
{
account,
storageNode,
icqConnection,
timer,
},
);
return {
publicSubscribers: holder.getPublicTopics(),
Expand Down
Loading

0 comments on commit 2d32d1e

Please sign in to comment.