Skip to content

Commit

Permalink
Merge pull request #8775 from Agoric/8445-reintegrate
Browse files Browse the repository at this point in the history
fix: smartWallet watch ERTP purse balances across zoe upgrades
  • Loading branch information
mergify[bot] committed Jan 23, 2024
2 parents af57c38 + 3cfe392 commit 99fc025
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 44 deletions.
16 changes: 12 additions & 4 deletions packages/inter-protocol/test/smartWallet/test-psm-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AmountMath, makeIssuerKit } from '@agoric/ertp';
import { eventLoopIteration } from '@agoric/internal/src/testing-utils.js';
import { E } from '@endo/far';
import { NonNullish } from '@agoric/assert';
import { keyEQ } from '@agoric/store';

import { coalesceUpdates } from '@agoric/smart-wallet/src/utils.js';
import { Stable } from '@agoric/internal/src/tokens.js';
Expand Down Expand Up @@ -93,10 +94,17 @@ test('null swap', async t => {
t.is(await E.get(getBalanceFor(anchor.brand)).value, 0n);
t.is(await E.get(getBalanceFor(mintedBrand)).value, 0n);

t.deepEqual(currents[0].liveOffers, []);
t.deepEqual(currents[1].liveOffers, []);
t.deepEqual(currents[2].liveOffers, [['nullSwap', offer]]);
t.deepEqual(currents[3].liveOffers, []);
const index = currents.findIndex(x => {
return (
x.liveOffers[0] &&
x.liveOffers[0][0] === 'nullSwap' &&
keyEQ(x.liveOffers[0][1], offer)
);
});

t.deepEqual(currents[index - 1].liveOffers, []);
t.deepEqual(currents[index].liveOffers, [['nullSwap', offer]]);
t.deepEqual(currents[index + 1].liveOffers, []);
});

// we test this direction of swap because wanting anchor would require the PSM to have anchor in it first
Expand Down
4 changes: 3 additions & 1 deletion packages/smart-wallet/src/offerWatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ export const prepareOfferWatcher = baggage => {
*/
onRejected(err, seat) {
const { facets } = this;
void watchForNumWants(facets, seat);
if (isUpgradeDisconnection(err)) {
void watchForNumWants(facets, seat);
}
},
},
},
Expand Down
154 changes: 115 additions & 39 deletions packages/smart-wallet/src/smartWallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
objectMap,
StorageNodeShape,
} from '@agoric/internal';
import { observeNotifier } from '@agoric/notifier';
import { isUpgradeDisconnection } from '@agoric/internal/src/upgrade-api.js';
import { M, mustMatch } from '@agoric/store';
import {
appendToStoredArray,
Expand All @@ -22,8 +22,10 @@ import {
import {
makeScalarBigMapStore,
makeScalarBigWeakMapStore,
prepareExoClass,
prepareExoClassKit,
provide,
watchPromise,
} from '@agoric/vat-data';
import {
prepareRecorderKit,
Expand Down Expand Up @@ -271,6 +273,59 @@ export const prepareSmartWallet = (baggage, shared) => {

const makeOfferWatcher = prepareOfferWatcher(baggage);

const updateShape = {
value: AmountShape,
updateCount: M.bigint(),
};

const NotifierShape = M.remotable();
const amountWatcherGuard = M.interface('paymentWatcher', {
onFulfilled: M.call(updateShape, NotifierShape).returns(),
onRejected: M.call(M.any(), NotifierShape).returns(M.promise()),
});

const prepareAmountWatcher = () =>
prepareExoClass(
baggage,
'AmountWatcher',
amountWatcherGuard,
/**
* @param {Purse} purse
* @param {ReturnType<makeWalletWithResolvedStorageNodes>['helper']} helper
*/
(purse, helper) => ({ purse, helper }),
{
/**
* @param {{ value: Amount, updateCount: bigint | undefined }} updateRecord
* @param { Notifier<Amount> } notifier
* @returns {void}
*/
onFulfilled(updateRecord, notifier) {
const { helper, purse } = this.state;
helper.updateBalance(purse, updateRecord.value);
helper.watchNextBalance(
this.self,
notifier,
updateRecord.updateCount,
);
},
/**
* @param {unknown} err
* @returns {Promise<void>}
*/
onRejected(err) {
const { helper, purse } = this.state;
if (isUpgradeDisconnection(err)) {
return helper.watchPurse(purse); // retry
}
helper.logWalletError(`failed amount observer`, err);
throw err;
},
},
);

const makeAmountWatcher = prepareAmountWatcher();

/**
* @param {UniqueParams} unique
* @returns {State}
Expand Down Expand Up @@ -356,7 +411,9 @@ export const prepareSmartWallet = (baggage, shared) => {
.returns(M.promise()),
publishCurrentState: M.call().returns(),
watchPurse: M.call(M.eref(PurseShape)).returns(M.promise()),
repairUnwatchedSeats: M.call().returns(),
watchNextBalance: M.call(M.any(), NotifierShape, M.bigint()).returns(),
repairUnwatchedSeats: M.call().returns(M.promise()),
repairUnwatchedPurses: M.call().returns(M.promise()),
updateStatus: M.call(M.any()).returns(),
addContinuingOffer: M.call(
M.or(M.number(), M.string()),
Expand Down Expand Up @@ -478,32 +535,24 @@ export const prepareSmartWallet = (baggage, shared) => {

/** @type {(purse: ERef<Purse>) => Promise<void>} */
async watchPurse(purseRef) {
const { facets } = this;
const { helper } = this.facets;

// This would seem to fit the observeNotifier() pattern,
// but purse notifiers are not necessarily durable.
// If there is an error due to upgrade, retry watchPurse().

const purse = await purseRef; // promises don't fit in durable storage
const handler = makeAmountWatcher(purse, helper);

const { helper } = this.facets;
// publish purse's balance and changes
void E.when(
E(purse).getCurrentAmount(),
balance => helper.updateBalance(purse, balance),
err =>
facets.helper.logWalletError(
'initial purse balance publish failed',
err,
),
);
void observeNotifier(E(purse).getCurrentAmountNotifier(), {
updateState(balance) {
helper.updateBalance(purse, balance);
},
fail(reason) {
facets.helper.logWalletError(
'⚠️ failed updateState observer',
reason,
);
},
});
const notifier = await E(purse).getCurrentAmountNotifier();
const startP = E(notifier).getUpdateSince(undefined);
watchPromise(startP, handler, notifier);
},

watchNextBalance(handler, notifier, updateCount) {
const nextP = E(notifier).getUpdateSince(updateCount);
watchPromise(nextP, handler, notifier);
},

/**
Expand Down Expand Up @@ -580,8 +629,6 @@ export const prepareSmartWallet = (baggage, shared) => {
const { zoe, agoricNames, invitationBrand, invitationIssuer } =
shared;

await null;

const invitationFromSpec = makeInvitationsHelper(
zoe,
agoricNames,
Expand All @@ -590,26 +637,49 @@ export const prepareSmartWallet = (baggage, shared) => {
state.offerToInvitationMakers.get,
);

const watcherPromises = [];
for (const seatId of liveOfferSeats.keys()) {
facets.helper.logWalletInfo(`repairing ${seatId}`);
const offerSpec = liveOffers.get(seatId);
const seat = liveOfferSeats.get(seatId);

const invitation = invitationFromSpec(offerSpec.invitationSpec);
const invitationAmount =
await E(invitationIssuer).getAmountOf(invitation);
const watcher = makeOfferWatcher(
facets.helper,
facets.deposit,
offerSpec,
address,
invitationAmount,
seat,
watcherPromises.push(
E.when(
E(invitationIssuer).getAmountOf(invitation),
invitationAmount => {
const watcher = makeOfferWatcher(
facets.helper,
facets.deposit,
offerSpec,
address,
invitationAmount,
seat,
);
return watchOfferOutcomes(watcher, seat);
},
),
);

void watchOfferOutcomes(watcher, seat);
trace(`Repaired seat ${seatId} for wallet ${address}`);
}

await Promise.all(watcherPromises);
},
async repairUnwatchedPurses() {
const { state, facets } = this;
const { helper, self } = facets;
const { invitationPurse, address } = state;

const brandToPurses = getBrandToPurses(walletPurses, self);
trace(`Found ${brandToPurses.values()} purse(s) for ${address}`);
for (const purses of brandToPurses.values()) {
for (const record of purses) {
void helper.watchPurse(record.purse);
trace(`Repaired purse ${record.petname} of ${address}`);
}
}

void helper.watchPurse(invitationPurse);
},

/** @param {import('./offers.js').OfferStatus} offerStatus */
Expand Down Expand Up @@ -898,8 +968,12 @@ export const prepareSmartWallet = (baggage, shared) => {
await watchOfferOutcomes(watcher, seatRef);
} catch (err) {
facets.helper.logWalletError('OFFER ERROR:', err);

// Notify the user
if (watcher) {
if (err.upgradeMessage === 'vat upgraded') {
// The offer watchers will reconnect. Don't reclaim or exit
return;
} else if (watcher) {
watcher.helper.updateStatus({ error: err.toString() });
} else {
facets.helper.updateStatus({
Expand Down Expand Up @@ -1035,13 +1109,15 @@ export const prepareSmartWallet = (baggage, shared) => {
* @param {object} key
*/
repairWalletForIncarnation2(key) {
const { facets } = this;
const { state, facets } = this;

if (key !== shared.secretWalletFactoryKey) {
return;
}

void facets.helper.repairUnwatchedSeats();
void facets.helper.repairUnwatchedPurses();
trace(`repaired wallet ${state.address}`);
},
},
},
Expand Down

0 comments on commit 99fc025

Please sign in to comment.