From 54b9b009fff3fd3ab54f731adee97195acaa238f Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 8 Aug 2024 12:50:43 -0600 Subject: [PATCH 1/6] fix(network): introduce `Finalizer` to close network --- golang/cosmos/x/vibc/types/ibc_module.go | 7 +- golang/cosmos/x/vtransfer/ibc_middleware.go | 2 +- packages/network/src/network.js | 243 +++++++++++++------- packages/network/src/router.js | 6 +- packages/network/src/shapes.js | 6 +- packages/network/test/fakes.js | 6 +- packages/network/test/network-misc.test.js | 37 ++- packages/vats/src/ibc.js | 31 ++- packages/vats/src/types.d.ts | 4 +- packages/vats/src/vat-network.js | 4 +- packages/vats/test/network.test.js | 43 ++-- packages/vats/tools/fake-bridge.js | 10 + 12 files changed, 264 insertions(+), 135 deletions(-) diff --git a/golang/cosmos/x/vibc/types/ibc_module.go b/golang/cosmos/x/vibc/types/ibc_module.go index b5abb322c0f..a240c2414ab 100644 --- a/golang/cosmos/x/vibc/types/ibc_module.go +++ b/golang/cosmos/x/vibc/types/ibc_module.go @@ -1,6 +1,8 @@ package types import ( + fmt "fmt" + sdkioerrors "cosmossdk.io/errors" "github.com/Agoric/agoric-sdk/golang/cosmos/vm" capability "github.com/cosmos/cosmos-sdk/x/capability/types" @@ -221,7 +223,10 @@ func (im IBCModule) OnChanCloseInit( } err := im.impl.PushAction(ctx, event) - return err + if err != nil { + return err + } + return fmt.Errorf("OnChanCloseInit can only be sent by the VM") } type ChannelCloseConfirmEvent struct { diff --git a/golang/cosmos/x/vtransfer/ibc_middleware.go b/golang/cosmos/x/vtransfer/ibc_middleware.go index 2264ae87e5b..b47c76c2a63 100644 --- a/golang/cosmos/x/vtransfer/ibc_middleware.go +++ b/golang/cosmos/x/vtransfer/ibc_middleware.go @@ -50,7 +50,7 @@ func NewIBCMiddleware(ibcModule porttypes.IBCModule, vtransferKeeper keeper.Keep // wrapped IBCModule. They are not performed in the context of a packet, and so // do not need to be intercepted. -// OnChanCloseInit implements the IBCModule interface. +// OnChanOpenInit implements the IBCModule interface. func (im IBCMiddleware) OnChanOpenInit( ctx sdk.Context, order channeltypes.Order, diff --git a/packages/network/src/network.js b/packages/network/src/network.js index d6e1a3bbc86..029675c5324 100644 --- a/packages/network/src/network.js +++ b/packages/network/src/network.js @@ -10,15 +10,24 @@ import { Shape } from './shapes.js'; /// /** - * @import {AttemptDescription, Bytes, Closable, CloseReason, Connection, ConnectionHandler, Endpoint, ListenHandler, Port, Protocol, ProtocolHandler, ProtocolImpl} from './types.js'; + * @import {AttemptDescription, Bytes, CloseReason, Closable, Connection, ConnectionHandler, Endpoint, ListenHandler, Port, Protocol, ProtocolHandler, ProtocolImpl} from './types.js'; + * @import {PromiseVow, Remote, VowTools} from '@agoric/vow'; */ +/** @typedef {VowTools & { finalizer: Finalizer }} Powers */ + +const sink = () => {}; +harden(sink); + /** * Compatibility note: this must match what our peers use, so don't change it * casually. */ export const ENDPOINT_SEPARATOR = '/'; +// Mark the finalizer close reason. +export const CLOSE_REASON_FINALIZER = 'closed-by-finalizer'; + /** @param {unknown} err */ export const rethrowUnlessMissing = err => { // Ugly hack rather than being able to determine if the function @@ -67,7 +76,7 @@ function throwIfInvalidPortName(specifiedName) { /** * @typedef {object} ConnectionOpts * @property {Endpoint[]} addrs - * @property {import('@agoric/vow').Remote>[]} handlers + * @property {Remote>[]} handlers * @property {MapStore} conns * @property {WeakSetStore} current * @property {0|1} l @@ -76,9 +85,9 @@ function throwIfInvalidPortName(specifiedName) { /** * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {Powers} powers */ -const prepareHalfConnection = (zone, { watch }) => { +const prepareHalfConnection = (zone, { watch, allVows, finalizer }) => { const makeHalfConnectionKit = zone.exoClassKit( 'Connection', Shape.ConnectionI, @@ -123,18 +132,20 @@ const prepareHalfConnection = (zone, { watch }) => { return watch(innerVow, this.facets.rethrowUnlessMissingWatcher); }, async close() { - const { closed, current, conns, l, handlers } = this.state; + const { closed, current, conns, l, r } = this.state; if (closed) { throw Error(closed); } this.state.closed = 'Connection closed'; - current.delete(conns.get(l)); + + // Tear down both sides. + const lconn = conns.get(l); + const rconn = conns.get(r); + current.delete(lconn); + current.delete(rconn); + const innerVow = watch( - E(this.state.handlers[l]).onClose( - conns.get(l), - undefined, - handlers[l], - ), + allVows([finalizer.finalize(lconn), finalizer.finalize(rconn)]), this.facets.sinkWatcher, ); @@ -176,11 +187,12 @@ const prepareHalfConnection = (zone, { watch }) => { /** * @param {import('@agoric/zone').Zone} zone - * @param {import('@agoric/vow').Remote>} handler0 + * @param {Remote>} handler0 * @param {Endpoint} addr0 - * @param {import('@agoric/vow').Remote>} handler1 + * @param {Remote>} handler1 * @param {Endpoint} addr1 * @param {(opts: ConnectionOpts) => Connection} makeConnection + * @param {Finalizer} finalizer * @param {WeakSetStore} [current] */ export const crossoverConnection = ( @@ -190,6 +202,7 @@ export const crossoverConnection = ( handler1, addr1, makeConnection, + finalizer, current = zone.detached().weakSetStore('crossoverCurrentConnections'), ) => { const detached = zone.detached(); @@ -197,7 +210,7 @@ export const crossoverConnection = ( /** @type {MapStore} */ const conns = detached.mapStore('addrToConnections'); - /** @type {import('@agoric/vow').Remote>[]} */ + /** @type {Remote>[]} */ const handlers = harden([handler0, handler1]); /** @type {Endpoint[]} */ const addrs = harden([addr0, addr1]); @@ -215,9 +228,13 @@ export const crossoverConnection = ( * @param {number} r remote side of the connection */ const openHalfConnection = (l, r) => { - current.add(conns.get(l)); + const lconn = conns.get(l); + current.add(lconn); + if (!finalizer.has(lconn)) { + finalizer.initConnection(lconn, handlers[l]); + } E(handlers[l]) - .onOpen(conns.get(l), addrs[l], addrs[r], handlers[l]) + .onOpen(lconn, addrs[l], addrs[r], handlers[l]) .catch(rethrowUnlessMissing); }; @@ -233,9 +250,9 @@ export const crossoverConnection = ( /** * @param {import('@agoric/zone').Zone} zone * @param {(opts: ConnectionOpts) => Connection} makeConnection - * @param {ReturnType} powers + * @param {Powers} powers */ -const prepareInboundAttempt = (zone, makeConnection, { watch }) => { +const prepareInboundAttempt = (zone, makeConnection, { watch, finalizer }) => { const makeInboundAttemptKit = zone.exoClassKit( 'InboundAttempt', Shape.InboundAttemptI, @@ -245,7 +262,7 @@ const prepareInboundAttempt = (zone, makeConnection, { watch }) => { * @param {string} opts.remoteAddr * @param {MapStore>} opts.currentConnections * @param {string} opts.listenPrefix - * @param {MapStore>]>} opts.listening + * @param {MapStore>]>} opts.listening */ ({ localAddr, @@ -288,6 +305,7 @@ const prepareInboundAttempt = (zone, makeConnection, { watch }) => { const current = currentConnections.get(port); current.delete(this.facets.inboundAttempt); + finalizer.unpin(this.facets.inboundAttempt); const innerVow = watch( E(listener).onReject(port, localAddr, remoteAddr, listener), @@ -300,7 +318,7 @@ const prepareInboundAttempt = (zone, makeConnection, { watch }) => { * @param {object} opts * @param {string} [opts.localAddress] * @param {string} [opts.remoteAddress] - * @param {import('@agoric/vow').Remote} opts.handler + * @param {Remote} opts.handler */ async accept({ localAddress, remoteAddress, handler: rchandler }) { const { consummated, localAddr, remoteAddr } = this.state; @@ -342,15 +360,12 @@ const prepareInboundAttempt = (zone, makeConnection, { watch }) => { return crossoverConnection( zone, - /** @type {import('@agoric/vow').Remote>} */ ( - lchandler - ), + /** @type {Remote>} */ (lchandler), localAddress, - /** @type {import('@agoric/vow').Remote>} */ ( - rchandler - ), + /** @type {Remote>} */ (rchandler), remoteAddress, makeConnection, + finalizer, current, )[1]; }, @@ -398,22 +413,22 @@ const RevokeState = /** @type {const} */ ({ /** * @param {import('@agoric/zone').Zone} zone - * @param {ReturnType} powers + * @param {Powers} powers */ const preparePort = (zone, powers) => { const makeIncapable = zone.exoClass('Incapable', undefined, () => ({}), {}); - const { watch, allVows } = powers; + const { finalizer, watch, allVows } = powers; /** * @param {object} opts * @param {Endpoint} opts.localAddr - * @param {MapStore>]>} opts.listening - * @param {SetStore>} opts.openConnections + * @param {MapStore>]>} opts.listening + * @param {SetStore>} opts.openConnections * @param {MapStore>} opts.currentConnections * @param {MapStore} opts.boundPorts - * @param {import('@agoric/vow').Remote} opts.protocolHandler - * @param {import('@agoric/vow').Remote} opts.protocolImpl + * @param {Remote} opts.protocolHandler + * @param {Remote} opts.protocolImpl */ const initPort = ({ localAddr, @@ -443,7 +458,7 @@ const preparePort = (zone, powers) => { // Works even after revoke(). return this.state.localAddr; }, - /** @param {import('@agoric/vow').Remote} listenHandler */ + /** @param {Remote} listenHandler */ async addListener(listenHandler) { const { revoked, listening, localAddr, protocolHandler } = this.state; @@ -458,9 +473,7 @@ const preparePort = (zone, powers) => { } listening.set(localAddr, [ this.facets.port, - /** @type {import('@agoric/vow').Remote>} */ ( - listenHandler - ), + /** @type {Remote>} */ (listenHandler), ]); E(lhandler).onRemove(lport, lhandler).catch(rethrowUnlessMissing); } else { @@ -468,9 +481,7 @@ const preparePort = (zone, powers) => { localAddr, harden([ this.facets.port, - /** @type {import('@agoric/vow').Remote>} */ ( - listenHandler - ), + /** @type {Remote>} */ (listenHandler), ]), ); } @@ -489,7 +500,7 @@ const preparePort = (zone, powers) => { ); return watch(innerVow, this.facets.rethrowUnlessMissingWatcher); }, - /** @param {import('@agoric/vow').Remote} listenHandler */ + /** @param {Remote} listenHandler */ async removeListener(listenHandler) { const { listening, localAddr, protocolHandler } = this.state; listening.has(localAddr) || Fail`Port ${localAddr} is not listening`; @@ -511,11 +522,11 @@ const preparePort = (zone, powers) => { }, /** * @param {Endpoint} remotePort - * @param {import('@agoric/vow').Remote} [connectionHandler] + * @param {Remote} [connectionHandler] */ async connect( remotePort, - connectionHandler = /** @type {import('@agoric/vow').Remote} */ ( + connectionHandler = /** @type {Remote} */ ( makeIncapable() ), ) { @@ -527,7 +538,7 @@ const preparePort = (zone, powers) => { return watch( E(protocolImpl).outbound(this.facets.port, dst, connectionHandler), this.facets.portConnectWatcher, - { revoked }, + { chandler: connectionHandler }, ); }, async revoke() { @@ -538,7 +549,6 @@ const preparePort = (zone, powers) => { Fail`Port ${localAddr} is already revoked`; this.state.revoked = RevokeState.REVOKING; - const revokeVow = watch( E(protocolHandler).onRevoke( this.facets.port, @@ -564,15 +574,16 @@ const preparePort = (zone, powers) => { }, }, portConnectWatcher: { - onFulfilled(conn, watchContext) { - const { revoked } = watchContext; - const { openConnections } = this.state; + onFulfilled(conn, { chandler }) { + const { openConnections, revoked } = this.state; + if (!finalizer.has(conn)) { + finalizer.initConnection(conn, chandler); + } if (revoked) { - void E(conn).close(); - } else { - openConnections.add(conn); + return finalizer.finalize(conn); } + openConnections.add(conn); return conn; }, }, @@ -586,8 +597,8 @@ const preparePort = (zone, powers) => { const ps = []; ps.push( - ...values.map(conn => - watch(E(conn).close(), this.facets.sinkWatcher), + ...values.map(obj => + watch(finalizer.finalize(obj), this.facets.sinkWatcher), ), ); @@ -650,12 +661,12 @@ const preparePort = (zone, powers) => { /** * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {Powers} powers */ const prepareBinder = (zone, powers) => { const makeConnection = prepareHalfConnection(zone, powers); - const { watch } = powers; + const { watch, finalizer } = powers; const makeInboundAttempt = prepareInboundAttempt( zone, @@ -730,8 +741,8 @@ const prepareBinder = (zone, powers) => { * @param {object} opts * @param {MapStore>} opts.currentConnections * @param {MapStore} opts.boundPorts - * @param {MapStore>]>} opts.listening - * @param {import('@agoric/vow').Remote} opts.protocolHandler + * @param {MapStore>]>} opts.listening + * @param {Remote} opts.protocolHandler */ ({ currentConnections, boundPorts, listening, protocolHandler }) => { /** @type {SetStore} */ @@ -777,7 +788,7 @@ const prepareBinder = (zone, powers) => { const innerVow = watch( E( - /** @type {import('@agoric/vow').Remote>} */ ( + /** @type {Remote>} */ ( protocolHandler ), ).onInstantiate( @@ -819,7 +830,7 @@ const prepareBinder = (zone, powers) => { // Allocate a local address. const instantiateInnerVow = watch( E( - /** @type {import('@agoric/vow').Remote>} */ ( + /** @type {Remote>} */ ( protocolHandler ), ).onInstantiate(port, localAddr, remoteAddr, protocolHandler), @@ -904,6 +915,7 @@ const prepareBinder = (zone, powers) => { }); current.add(inboundAttempt); + finalizer.initCloser(inboundAttempt); return inboundAttempt; }, }, @@ -945,7 +957,7 @@ const prepareBinder = (zone, powers) => { const innerVow = watch( E( - /** @type {import('@agoric/vow').Remote>} */ ( + /** @type {Remote>} */ ( protocolHandler ), ).onInstantiate( @@ -1008,15 +1020,12 @@ const prepareBinder = (zone, powers) => { return crossoverConnection( zone, - /** @type {import('@agoric/vow').Remote>} */ ( - lchandler - ), + /** @type {Remote>} */ (lchandler), negotiatedLocalAddress || requestedLocalAddress, - /** @type {import('@agoric/vow').Remote>} */ ( - rchandler - ), + /** @type {Remote>} */ (rchandler), negotiatedRemoteAddress || requestedRemoteAddress, makeConnection, + finalizer, current, )[0]; }, @@ -1169,13 +1178,13 @@ const prepareBinder = (zone, powers) => { /** * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {Powers} powers */ export const prepareNetworkProtocol = (zone, powers) => { const makeBinderKit = prepareBinder(zone, powers); /** - * @param {import('@agoric/vow').Remote} protocolHandler + * @param {Remote} protocolHandler * @returns {Protocol} */ const makeNetworkProtocol = protocolHandler => { @@ -1187,7 +1196,7 @@ export const prepareNetworkProtocol = (zone, powers) => { /** @type {MapStore} */ const boundPorts = detached.mapStore('addrToPort'); - /** @type {MapStore>]>} */ + /** @type {MapStore>]>} */ const listening = detached.mapStore('listening'); const { binder, protocolImpl } = makeBinderKit({ @@ -1261,17 +1270,17 @@ export const prepareEchoConnectionKit = zone => { }, /** * @param {Connection} _connection - * @param {CloseReason} [_reason] + * @param {CloseReason} [reason] * @param {ConnectionHandler} [_connectionHandler] */ - async onClose(_connection, _reason, _connectionHandler) { + async onClose(_connection, reason, _connectionHandler) { const { closed } = this.state; if (closed) { throw Error(closed); } - this.state.closed = 'Connection closed'; + this.state.closed = reason || 'Connection closed'; }, }, listener: { @@ -1293,14 +1302,14 @@ export const prepareEchoConnectionKit = zone => { * Create a protocol handler that just connects to itself. * * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {VowTools} powers */ export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) { const detached = zone.detached(); /** @param {string} [instancePrefix] */ const initHandler = (instancePrefix = 'nonce/') => { - /** @type {MapStore, import('@agoric/vow').Remote>]>} */ + /** @type {MapStore, Remote>]>} */ const listeners = detached.mapStore('localAddr'); return { @@ -1379,7 +1388,7 @@ export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) { localAddr, harden([ port, - /** @type {import('@agoric/vow').Remote>} */ ( + /** @type {Remote>} */ ( listenHandler ), ]), @@ -1390,17 +1399,15 @@ export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) { localAddr, harden([ port, - /** @type {import('@agoric/vow').Remote>} */ ( - listenHandler - ), + /** @type {Remote>} */ (listenHandler), ]), ); } }, /** - * @param {import('@agoric/vow').Remote} port + * @param {Remote} port * @param {Endpoint} localAddr - * @param {import('@agoric/vow').Remote} listenHandler + * @param {Remote} listenHandler * @param {*} _protocolHandler */ async onListenRemove(port, localAddr, listenHandler, _protocolHandler) { @@ -1453,7 +1460,7 @@ export function prepareLoopbackProtocolHandler(zone, { watch, allVows }) { /** * * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {Powers} powers */ export const preparePortAllocator = (zone, { watch }) => zone.exoClass( @@ -1523,3 +1530,79 @@ export const preparePortAllocator = (zone, { watch }) => }, ); /** @typedef {ReturnType>} PortAllocator */ + +/** + * Return a package-specific singleton that pins objects until they are + * explicitly unpinned or finalized. It needs to pin objects only because they + * are resources that need to be released. + * + * The reason this functionality wasn't just baked into the other network exos + * is to maintain upgrade-compatible with minimal additional changes. + * + * @param {import('@agoric/base-zone').Zone} zone + * @param {VowTools} vowTools + */ +const prepareFinalizer = (zone, { watch }) => { + /** + * @type {MapStore<{}, + * { conn: Remote, handler: Remote>} | + * { closer: Remote<{ close(): PromiseVow }> } + * >} + */ + const objToFinalizerInfo = zone.mapStore('objToFinalizerInfo'); + return zone.exo('NetworkFinalizer', undefined, { + has(obj) { + return objToFinalizerInfo.has(obj); + }, + /** + * Add a connection and handler for an `onClose` method to be called upon + * finalization. + * @param {Remote} conn + * @param {Remote>} handler + */ + initConnection(conn, handler) { + objToFinalizerInfo.init(conn, harden({ conn, handler })); + }, + /** + * Add an object with a `close` method to be called (such as an + * `inboundAttempt`) upon finalization. + * @param {Remote<{ close(): PromiseVow }>} closer + */ + initCloser(closer) { + objToFinalizerInfo.init(closer, harden({ closer })); + }, + finalize(obj) { + if (!objToFinalizerInfo.has(obj)) { + return; + } + const disposeInfo = objToFinalizerInfo.get(obj); + if ('conn' in disposeInfo) { + // A connection+handler. + const { conn, handler } = disposeInfo; + objToFinalizerInfo.delete(obj); + return watch(E(handler).onClose(conn, CLOSE_REASON_FINALIZER, handler)); + } else if ('closer' in disposeInfo) { + // Just something with a `close` method. + const { closer } = disposeInfo; + objToFinalizerInfo.delete(obj); + return watch(E(closer).close()); + } + }, + unpin(obj) { + objToFinalizerInfo.delete(obj); + }, + }); +}; +harden(prepareFinalizer); + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {VowTools} vowTools + * @returns {Powers} + */ +export const prepareNetworkPowers = (zone, vowTools) => { + const finalizer = prepareFinalizer(zone, vowTools); + return harden({ ...vowTools, finalizer }); +}; + +/** @typedef {ReturnType} Finalizer */ diff --git a/packages/network/src/router.js b/packages/network/src/router.js index cea73335bd2..b0b6f003511 100644 --- a/packages/network/src/router.js +++ b/packages/network/src/router.js @@ -10,8 +10,8 @@ import { ENDPOINT_SEPARATOR, prepareNetworkProtocol } from './network.js'; import { Shape } from './shapes.js'; /** - * @import {AttemptDescription, Bytes, Closable, CloseReason, Connection, ConnectionHandler, Endpoint, ListenHandler, Port, Protocol, ProtocolHandler, ProtocolImpl} from './types.js'; - * @import {PromiseVow, Remote, VowKit, VowResolver, VowTools} from '@agoric/vow'; + * @import {Endpoint, Port, Protocol, ProtocolHandler} from './types.js'; + * @import {PromiseVow, Remote, VowTools} from '@agoric/vow'; */ /** @@ -108,7 +108,7 @@ export const prepareRouter = zone => { * Create a router that behaves like a Protocol. * * @param {import('@agoric/base-zone').Zone} zone - * @param {ReturnType} powers + * @param {import('./network.js').Powers} powers * @param {typeof defaultE} [E] Eventual sender */ export const prepareRouterProtocol = (zone, powers, E = defaultE) => { diff --git a/packages/network/src/shapes.js b/packages/network/src/shapes.js index fab74462b3e..726b2261112 100644 --- a/packages/network/src/shapes.js +++ b/packages/network/src/shapes.js @@ -158,18 +158,18 @@ export const Shape = /** @type {const} */ harden({ ).returns(Shape2.Vow$(M.undefined())), }), protocolHandlerAcceptWatcher: M.interface('ProtocolHandlerAcceptWatcher', { - onFulfilled: M.call(M.any()).rest(M.any()).returns(), + onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()), }), protocolHandlerInstantiateWatcher: M.interface( 'ProtocolHandlerInstantiateWatcher', { - onFulfilled: M.call(M.any()).rest(M.any()).returns(), + onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()), }, ), protocolHandlerConnectWatcher: M.interface( 'ProtocolHandlerConnectWatcher', { - onFulfilled: M.call(M.any()).rest(M.any()).returns(), + onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()), }, ), rethrowUnlessMissingWatcher: M.interface('RethrowUnlessMissingWatcher', { diff --git a/packages/network/test/fakes.js b/packages/network/test/fakes.js index 70f3d65c44f..9ad80dca982 100644 --- a/packages/network/test/fakes.js +++ b/packages/network/test/fakes.js @@ -3,6 +3,7 @@ import { prepareVowTools } from '@agoric/vow'; import assert from 'node:assert/strict'; import { prepareEchoConnectionKit, + prepareNetworkPowers, prepareNetworkProtocol, preparePortAllocator, } from '../src/index.js'; @@ -104,9 +105,10 @@ export const prepareProtocolHandler = ( * @param {Zone} zone */ export const fakeNetworkEchoStuff = zone => { - const powers = prepareVowTools(zone); - const { makeVowKit, when } = powers; + const vowTools = prepareVowTools(zone); + const powers = prepareNetworkPowers(zone, vowTools); + const { makeVowKit, when } = powers; const makeNetworkProtocol = prepareNetworkProtocol(zone, powers); const makeEchoConnectionHandler = prepareEchoConnectionKit(zone); const makeProtocolHandler = prepareProtocolHandler( diff --git a/packages/network/test/network-misc.test.js b/packages/network/test/network-misc.test.js index 968cdf41339..eb56e40335a 100644 --- a/packages/network/test/network-misc.test.js +++ b/packages/network/test/network-misc.test.js @@ -12,7 +12,9 @@ import { prepareLoopbackProtocolHandler, prepareNetworkProtocol, prepareRouter, + prepareNetworkPowers, unparse, + CLOSE_REASON_FINALIZER, } from '../src/index.js'; import { fakeNetworkEchoStuff } from './fakes.js'; @@ -39,13 +41,11 @@ test('handled protocol', async t => { const port = await when(protocol.bindPort('/ibc/*/ordered')); - const { vow, resolver } = makeVowKit(); - const prepareTestProtocolHandler = () => { const makeTestProtocolHandler = zone.exoClass( 'TestProtocolHandler', undefined, - () => ({ resolver }), + resolver => ({ resolver }), { async onOpen(connection, localAddr, remoteAddr) { t.is(localAddr, '/ibc/*/ordered'); @@ -53,11 +53,11 @@ test('handled protocol', async t => { const ack = await when(E(connection).send('ping')); // log(ack); t.is(`${ack}`, 'ping', 'received pong'); - void connection.close(); + await connection.close(); }, async onClose(_connection, reason) { - t.is(reason, undefined, 'no close reason'); - this.state.resolver.resolve(null); + t.is(reason, CLOSE_REASON_FINALIZER, 'finalizer close reason'); + this.state.resolver.resolve(reason); }, async onReceive(_connection, bytes) { t.is(`${bytes}`, 'ping'); @@ -71,8 +71,9 @@ test('handled protocol', async t => { const makeTestProtocolHandler = prepareTestProtocolHandler(); - await port.connect('/ibc/*/ordered/echo', makeTestProtocolHandler()); - await when(vow); + const { vow, resolver } = makeVowKit(); + await port.connect('/ibc/*/ordered/echo', makeTestProtocolHandler(resolver)); + t.is(await when(vow), CLOSE_REASON_FINALIZER); await when(port.revoke()); }); @@ -159,16 +160,14 @@ test('protocol connection listen', async t => { const { vow, resolver } = makeVowKit(); const prepareConnectionHandler = () => { - let handler; - const makeConnectionHandler = zone.exoClass( 'connectionHandler', undefined, - () => ({ resolver }), + () => ({ handler: undefined, resolver }), { async onOpen(connection, _localAddr, _remoteAddr, connectionHandler) { t.assert(connectionHandler, `connectionHandler is tracked in onOpen`); - handler = connectionHandler; + this.state.handler = connectionHandler; const ack = await when(connection.send('ping')); t.is(`${ack}`, 'ping', 'received pong'); await when(connection.close()); @@ -176,18 +175,17 @@ test('protocol connection listen', async t => { async onClose(c, reason, connectionHandler) { t.is( connectionHandler, - handler, + this.state.handler, `connectionHandler is tracked in onClose`, ); - handler = undefined; + this.state.handler = undefined; t.assert(c, 'connection is passed to onClose'); - t.is(reason, undefined, 'no close reason'); - this.state.resolver.resolve(null); + this.state.resolver.resolve(reason); }, async onReceive(c, packet, connectionHandler) { t.is( connectionHandler, - handler, + this.state.handler, `connectionHandler is tracked in onReceive`, ); t.assert(c, 'connection is passed to onReceive'); @@ -285,7 +283,8 @@ test('protocol connection listen', async t => { test('loopback protocol', async t => { const zone = provideDurableZone('network-loopback-protocol'); - const powers = prepareVowTools(zone); + const vowTools = prepareVowTools(zone); + const powers = prepareNetworkPowers(zone, vowTools); const { makeVowKit, when } = powers; const makeLoopbackProtocolHandler = prepareLoopbackProtocolHandler( zone, @@ -365,7 +364,7 @@ test('loopback protocol', async t => { }); test('routing', async t => { - const zone = provideDurableZone('network-loopback-protocol'); + const zone = provideDurableZone('routing-protocol'); const makeRouter = prepareRouter(zone); const router = makeRouter(); t.deepEqual(router.getRoutes('/if/local'), [], 'get routes matches none'); diff --git a/packages/vats/src/ibc.js b/packages/vats/src/ibc.js index 475cabda795..52ec6a8d566 100644 --- a/packages/vats/src/ibc.js +++ b/packages/vats/src/ibc.js @@ -140,26 +140,32 @@ export const prepareIBCConnectionHandler = zone => { return protocolUtils.ibcSendPacket(packet, relativeTimeoutNs); }, /** @type {Required['onClose']} */ - async onClose() { + onClose(_conn, _reason) { const { portID, channelID } = this.state; - const { protocolUtils, channelKeyToSeqAck } = this.state; + const { protocolUtils, channelKeyToSeqAck, channelKeyToConnP } = + this.state; const packet = { source_port: portID, source_channel: channelID, }; - await protocolUtils.downcall('startChannelCloseInit', { - packet, - }); - const rejectReason = Error('Connection closed'); const channelKey = `${channelID}:${portID}`; - + const rejectReason = Error('Connection closed'); const seqToAck = channelKeyToSeqAck.get(channelKey); for (const ackKit of seqToAck.values()) { ackKit.resolver.reject(rejectReason); } channelKeyToSeqAck.delete(channelKey); + + // This Connection object is initiating the close event + if (channelKeyToConnP.has(channelKey)) { + channelKeyToConnP.delete(channelKey); + return protocolUtils.downcall('startChannelCloseInit', { + packet, + }); + } + return Promise.resolve(); }, }, ); @@ -586,11 +592,16 @@ export const prepareIBCProtocol = (zone, powers) => { break; } - case 'channelCloseInit': + // We ignore the close init tx message, since any decision to + // close should be left to the VM... + case 'channelCloseInit': { + break; + } + + // ... or received from the other side. case 'channelCloseConfirm': { const { portID, channelID } = - // could be either but that complicates line wrapping - /** @type {IBCEvent<'channelCloseInit'>} */ (obj); + /** @type {IBCEvent<'channelCloseConfirm'>} */ (obj); const channelKey = `${channelID}:${portID}`; if (channelKeyToConnP.has(channelKey)) { const conn = channelKeyToConnP.get(channelKey); diff --git a/packages/vats/src/types.d.ts b/packages/vats/src/types.d.ts index c49d17709d3..e4a7f330ace 100644 --- a/packages/vats/src/types.d.ts +++ b/packages/vats/src/types.d.ts @@ -192,8 +192,8 @@ type IBCPacketEvents = { timeoutPacket: { packet: IBCPacket; }; - channelCloseInit: ConnectingInfo; // TODO update - channelCloseConfirm: ConnectingInfo; // TODO update + channelCloseInit: { channelID: IBCChannelID; portID: IBCPortID }; + channelCloseConfirm: { channelID: IBCChannelID; portID: IBCPortID }; sendPacket: { relativeTimeoutNs: bigint; packet: IBCPacket }; }; diff --git a/packages/vats/src/vat-network.js b/packages/vats/src/vat-network.js index a76ac575f57..2b42b1c9b1c 100644 --- a/packages/vats/src/vat-network.js +++ b/packages/vats/src/vat-network.js @@ -3,6 +3,7 @@ import { makeDurableZone } from '@agoric/zone/durable.js'; import { prepareEchoConnectionKit, prepareLoopbackProtocolHandler, + prepareNetworkPowers, preparePortAllocator, prepareRouterProtocol, } from '@agoric/network'; @@ -11,7 +12,8 @@ import { Far } from '@endo/far'; export function buildRootObject(_vatPowers, _args, baggage) { const zone = makeDurableZone(baggage); - const powers = prepareVowTools(zone.subZone('vow')); + const vowTools = prepareVowTools(zone.subZone('vow')); + const powers = prepareNetworkPowers(zone, vowTools); const makeRouterProtocol = prepareRouterProtocol( zone.subZone('network'), diff --git a/packages/vats/test/network.test.js b/packages/vats/test/network.test.js index 84532ee4c28..7f4b739db1d 100644 --- a/packages/vats/test/network.test.js +++ b/packages/vats/test/network.test.js @@ -10,6 +10,7 @@ import { prepareVowTools } from '@agoric/vow/vat.js'; import { makeDurableZone } from '@agoric/zone/durable.js'; import { E } from '@endo/far'; +import { prepareNetworkPowers } from '@agoric/network'; import { buildRootObject as ibcBuildRootObject } from '../src/vat-ibc.js'; import { buildRootObject as networkBuildRootObject } from '../src/vat-network.js'; import { makeFakeIbcBridge } from '../tools/fake-bridge.js'; @@ -80,7 +81,8 @@ test('network - ibc', async t => { const ibcVat = E(ibcBuildRootObject)(null, null, provideBaggage('ibc')); const baggage = provideBaggage('network - ibc'); const zone = makeDurableZone(baggage); - const powers = prepareVowTools(zone); + const vowTools = prepareVowTools(zone); + const powers = prepareNetworkPowers(zone, vowTools); const { when } = powers; const makeDurablePublishKit = prepareDurablePublishKit( @@ -117,7 +119,10 @@ test('network - ibc', async t => { const makeIBCListener = prepareIBCListener(zone, makePlusOne); const testEcho = async () => { - await E(p).addListener(makeIBCListener({ publisher })); + const { publisher: voidP } = makeDurablePublishKit(); + + const listener = makeIBCListener({ publisher: voidP }); + await E(p).addListener(listener); t.log('Accepting an Inbound Connection'); const c = await when(E(p).connect('/ibc-port/port-1/unordered/foo')); @@ -128,26 +133,20 @@ test('network - ibc', async t => { t.log('Closing the Connection'); await when(E(c).close()); + await E(p).removeListener(listener); }; await testEcho(); const testIBCOutbound = async () => { + const listener = makeIBCListener({ publisher }); + await E(p).addListener(listener); + t.log('Connecting to a Remote Port'); const [hopName, portName, version] = ['connection-11', 'port-98', 'bar']; const remoteEndpoint = `/ibc-hop/${hopName}/ibc-port/${portName}/unordered/${version}`; const cP = E(p).connect(remoteEndpoint); - const evopen = await events.next(); - t.assert(!evopen.done); - t.deepEqual(evopen.value, [ - 'plusOne-open', - { - localAddr: '/ibc-port/port-1/unordered/foo', - remoteAddr: '/ibc-port/port-1', - }, - ]); - const ev2 = await events.next(); t.assert(!ev2.done); t.deepEqual(ev2.value, [ @@ -169,7 +168,11 @@ test('network - ibc', async t => { connectionHops: ['connection-11'], }); + t.log('Waiting for Connection'); const c = await when(cP); + + t.log('Waiting for events'); + const remoteAddress = c.getRemoteAddress(); const localAddress = c.getLocalAddress(); t.is( @@ -215,7 +218,21 @@ test('network - ibc', async t => { t.is(await when(ack), 'a-transfer-reply'); - await E(c).close(); + t.log('Closing the Connection'); + const closeV = E(c).close(); + const evclose = await events.next(); + t.assert(!evclose.done); + t.deepEqual(evclose.value, [ + 'startChannelCloseInit', + { + packet: { + source_channel: 'channel-1', + source_port: 'port-1', + }, + }, + ]); + + await when(closeV); }; await testIBCOutbound(); diff --git a/packages/vats/tools/fake-bridge.js b/packages/vats/tools/fake-bridge.js index 798d97846d0..acef4e54045 100644 --- a/packages/vats/tools/fake-bridge.js +++ b/packages/vats/tools/fake-bridge.js @@ -138,6 +138,16 @@ export const makeFakeIbcBridge = (zone, onToBridge) => { if (method === 'sendPacket') { const { packet } = params; return { ...packet, sequence: '39' }; + } else if (method === 'startChannelCloseInit') { + const { packet } = params; + if (hndlr) + E(hndlr) + .fromBridge({ + type: 'IBC_EVENT', + event: 'channelCloseConfirm', + packet, + }) + .catch(e => console.error(e)); } return undefined; }, From 503c30a58c97cd451f174027490f41a10fcc408c Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 8 Aug 2024 12:52:10 -0600 Subject: [PATCH 2/6] test(pegasus): update test for finalizers --- packages/pegasus/test/peg.test.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/pegasus/test/peg.test.js b/packages/pegasus/test/peg.test.js index bdf17045b9d..ec4d7f4a55b 100644 --- a/packages/pegasus/test/peg.test.js +++ b/packages/pegasus/test/peg.test.js @@ -5,6 +5,7 @@ import { E, Far } from '@endo/far'; import { prepareNetworkProtocol, prepareLoopbackProtocolHandler, + prepareNetworkPowers, } from '@agoric/network'; import bundleSource from '@endo/bundle-source'; @@ -43,8 +44,9 @@ async function testRemotePeg(t) { t.plan(24); // const zone = makeHeapZone(); - const zone = makeDurableZone(provideBaggage('peagsus')); - const powers = prepareVowTools(zone); + const zone = makeDurableZone(provideBaggage('pegasus')); + const vowTools = prepareVowTools(zone); + const powers = prepareNetworkPowers(zone, vowTools); const { makeVowKit, when } = powers; /** From 4117643a81be8f64700c25c3dede7bf650461592 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 8 Aug 2024 21:03:01 -0600 Subject: [PATCH 3/6] test(orch): update `network-fakes.js` for finalizers --- packages/orchestration/test/network-fakes.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/orchestration/test/network-fakes.ts b/packages/orchestration/test/network-fakes.ts index d7360331ab9..03bfe9ea419 100644 --- a/packages/orchestration/test/network-fakes.ts +++ b/packages/orchestration/test/network-fakes.ts @@ -2,6 +2,7 @@ import { VowTools } from '@agoric/vow'; import { prepareEchoConnectionKit, prepareLoopbackProtocolHandler, + prepareNetworkPowers, preparePortAllocator, prepareRouterProtocol, } from '@agoric/network'; @@ -279,8 +280,9 @@ export const setupFakeNetwork = ( zone: Zone, { vowTools }: { vowTools: VowTools }, ) => { - const makeRouterProtocol = prepareRouterProtocol(zone, vowTools); - const makePortAllocator = preparePortAllocator(zone, vowTools); + const powers = prepareNetworkPowers(zone, vowTools); + const makeRouterProtocol = prepareRouterProtocol(zone, powers); + const makePortAllocator = preparePortAllocator(zone, powers); const makeLoopbackProtocolHandler = prepareLoopbackProtocolHandler( zone, vowTools, From c2d9530e2d891bd9412969a43a9c5728cc3c2721 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 22 Aug 2024 20:29:59 -0600 Subject: [PATCH 4/6] feat(vats): upgrade the orchestration core --- .../proposals/e:upgrade-next/.gitignore | 2 +- golang/cosmos/app/upgrade.go | 6 ++- .../scripts/vats/upgrade-orch-core.js | 23 ++++++++++ .../proposals/upgrade-orch-core-proposal.js | 43 +++++++++++++++++++ 4 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 packages/builders/scripts/vats/upgrade-orch-core.js create mode 100644 packages/vats/src/proposals/upgrade-orch-core-proposal.js diff --git a/a3p-integration/proposals/e:upgrade-next/.gitignore b/a3p-integration/proposals/e:upgrade-next/.gitignore index ea217e46ed6..da0da987d7b 100644 --- a/a3p-integration/proposals/e:upgrade-next/.gitignore +++ b/a3p-integration/proposals/e:upgrade-next/.gitignore @@ -3,4 +3,4 @@ add-LEMONS/ add-OLIVES/ upgrade-bank/ upgrade-provisionPool/ - +upgrade-orch-core/ diff --git a/golang/cosmos/app/upgrade.go b/golang/cosmos/app/upgrade.go index e8d0adf9d96..6fcd8996ad4 100644 --- a/golang/cosmos/app/upgrade.go +++ b/golang/cosmos/app/upgrade.go @@ -1,7 +1,6 @@ package gaia import ( - "github.com/Agoric/agoric-sdk/golang/cosmos/vm" swingsetkeeper "github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/keeper" sdk "github.com/cosmos/cosmos-sdk/types" @@ -37,7 +36,10 @@ func unreleasedUpgradeHandler(app *GaiaApp, targetUpgrade string) func(sdk.Conte if isFirstTimeUpgradeOfThisVersion(app, ctx) { // Each CoreProposalStep runs sequentially, and can be constructed from // one or more modules executing in parallel within the step. - CoreProposalSteps = []vm.CoreProposalStep{} + CoreProposalSteps = []vm.CoreProposalStep{ + // Upgrade orch-core to the latest version. + vm.CoreProposalStepForModules("@agoric/builders/scripts/vats/upgrade-orch-core.js"), + } } app.upgradeDetails = &upgradeDetails{ diff --git a/packages/builders/scripts/vats/upgrade-orch-core.js b/packages/builders/scripts/vats/upgrade-orch-core.js new file mode 100644 index 00000000000..a4d8a21087b --- /dev/null +++ b/packages/builders/scripts/vats/upgrade-orch-core.js @@ -0,0 +1,23 @@ +import { makeHelpers } from '@agoric/deploy-script-support'; + +/** @type {import('@agoric/deploy-script-support/src/externalTypes.js').CoreEvalBuilder} */ +export const defaultProposalBuilder = async ({ publishRef, install }) => + harden({ + sourceSpec: '@agoric/vats/src/proposals/upgrade-orch-core-proposal.js', + getManifestCall: [ + 'getManifestForUpgradingOrchCore', + { + bundleRefs: { + ibc: publishRef(install('@agoric/vats/src/vat-ibc.js')), + network: publishRef(install('@agoric/vats/src/vat-network.js')), + localchain: publishRef(install('@agoric/vats/src/vat-localchain.js')), + transfer: publishRef(install('@agoric/vats/src/vat-transfer.js')), + }, + }, + ], + }); + +export default async (homeP, endowments) => { + const { writeCoreProposal } = await makeHelpers(homeP, endowments); + await writeCoreProposal('upgrade-network', defaultProposalBuilder); +}; diff --git a/packages/vats/src/proposals/upgrade-orch-core-proposal.js b/packages/vats/src/proposals/upgrade-orch-core-proposal.js new file mode 100644 index 00000000000..3d57869e9bd --- /dev/null +++ b/packages/vats/src/proposals/upgrade-orch-core-proposal.js @@ -0,0 +1,43 @@ +import { E } from '@endo/far'; + +/** + * @param {BootstrapPowers & { + * consume: { + * vatAdminSvc: VatAdminSvc; + * vatStore: MapStore< + * string, + * import('@agoric/swingset-vat').CreateVatResults + * >; + * }; + * }} powers + * @param {object} options + * @param {{ bundleRefs: { [vatName: string]: VatSourceRef } }} options.options + */ +export const upgradeOrchCore = async ( + { consume: { vatAdminSvc, vatStore } }, + options, +) => { + const { bundleRefs } = options.options; + + for await (const [name, ref] of Object.entries(bundleRefs)) { + assert(ref.bundleID, `bundleID missing for ${name}`); + console.log(name, `BUNDLE ID: `, ref.bundleID); + const bundleCap = await E(vatAdminSvc).getBundleCap(ref.bundleID); + + const { adminNode } = await E(vatStore).get(name); + await E(adminNode).upgrade(bundleCap, {}); + } +}; + +export const getManifestForUpgradingOrchCore = (_powers, { bundleRefs }) => ({ + manifest: { + [upgradeOrchCore.name]: { + consume: { + vatAdminSvc: 'vatAdminSvc', + vatStore: 'vatStore', + }, + produce: {}, + }, + }, + options: { bundleRefs }, +}); From 6ca6016951c4623a16a874484c3c6199cff9e44c Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Mon, 26 Aug 2024 15:57:23 -0600 Subject: [PATCH 5/6] test(upgrade-next): update vat incarnation counts --- .../proposals/e:upgrade-next/initial.test.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/a3p-integration/proposals/e:upgrade-next/initial.test.js b/a3p-integration/proposals/e:upgrade-next/initial.test.js index 64da243326f..03015a21317 100644 --- a/a3p-integration/proposals/e:upgrade-next/initial.test.js +++ b/a3p-integration/proposals/e:upgrade-next/initial.test.js @@ -3,17 +3,18 @@ import test from 'ava'; import { getVatDetails } from '@agoric/synthetic-chain'; const vats = { - network: { incarnation: 0 }, - ibc: { incarnation: 0 }, - localchain: { incarnation: 0 }, + network: { incarnation: 1 }, + ibc: { incarnation: 1 }, + localchain: { incarnation: 1 }, + transfer: { incarnation: 1 }, walletFactory: { incarnation: 4 }, zoe: { incarnation: 2 }, }; test(`vat details`, async t => { - await null; - for (const [vatName, expected] of Object.entries(vats)) { - const actual = await getVatDetails(vatName); - t.like(actual, expected, `${vatName} details mismatch`); + const actual = {}; + for await (const vatName of Object.keys(vats)) { + actual[vatName] = await getVatDetails(vatName); } + t.like(actual, vats, `vat details are alike`); }); From 064ff1ad395856111b4d82bb68d8ab92f8d83f12 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Mon, 26 Aug 2024 17:02:52 -0600 Subject: [PATCH 6/6] test(boot): skip `basicFlows` test until #9939 is fixed --- packages/boot/test/orchestration/restart-contracts.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/boot/test/orchestration/restart-contracts.test.ts b/packages/boot/test/orchestration/restart-contracts.test.ts index e6ef7216b6c..f1d28bc0630 100644 --- a/packages/boot/test/orchestration/restart-contracts.test.ts +++ b/packages/boot/test/orchestration/restart-contracts.test.ts @@ -178,7 +178,9 @@ test.serial('stakeAtom', async t => { // restarting that one. For them to share bootstrap they'll each need a unique // instance name, which will require paramatizing the the two builders scripts // and the two core-eval functions. -test.serial('basicFlows', async t => { +// +// TODO(#9939): Flaky under Node.js until liveslots problem exposed by vows is fixed. +test.serial.skip('basicFlows', async t => { const { walletFactoryDriver, buildProposal,