Skip to content

Commit

Permalink
refactor(vtransfer): transfer interceptor updates (#9470)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Jun 10, 2024
2 parents e193e66 + 8d11989 commit 0960242
Showing 1 changed file with 44 additions and 41 deletions.
85 changes: 44 additions & 41 deletions packages/vats/src/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ const prepareTransferInterceptor = (zone, vowTools) => {
'TransferInterceptorKit',
{
public: TargetAppI,
encodeAckWatcher: M.interface('EncodeAckWatcher', {
ackSender: M.interface('AckSender', {
onFulfilled: ReactionGuard,
}),
sendErrorWatcher: M.interface('SendErrorWatcher', {
nackSender: M.interface('NackSender', {
onRejected: ReactionGuard,
}),
logErrorWatcher: M.interface('LogErrorWatcher', {
errorLogger: M.interface('ErrorLogger', {
onRejected: ReactionGuard,
}),
},
Expand All @@ -55,7 +55,7 @@ const prepareTransferInterceptor = (zone, vowTools) => {
{
public: {
async receiveUpcall(obj) {
const { isActiveTap, tap, targetHost } = this.state;
const { isActiveTap, tap } = this.state;

obj.type === VTRANSFER_IBC_EVENT ||
Fail`Invalid upcall argument type ${obj.type}; expected ${bare(VTRANSFER_IBC_EVENT)}`;
Expand All @@ -67,64 +67,67 @@ const prepareTransferInterceptor = (zone, vowTools) => {

// See if the upcall result needs special handling.
if (obj.event === 'writeAcknowledgement') {
const ackMethodData = {
// Respond with the ack (or an active tap's replacement thereof).
const response = {
type: 'IBC_METHOD',
method: 'receiveExecuted',
packet: obj.packet,
ack: obj.acknowledgement,
};
if (isActiveTap) {
retP = watch(retP, this.facets.encodeAckWatcher, {
ackMethodData,
});
} else {
// This is a passive tap, so forward the ack without intervention.
ackMethodData.ack = obj.acknowledgement;
retP = watch(E(targetHost).sendDowncall(ackMethodData));
}
retP = watch(retP, this.facets.sendErrorWatcher, { ackMethodData });
const senderContext = { response, isActiveTap };
// An active tap must wait for the upcall result,
// but a passive tap can proceed immediately.
retP = isActiveTap
? watch(retP, this.facets.ackSender, senderContext)
: /** @type {any} */ (
E(this.facets.ackSender).onFulfilled(undefined, senderContext)
);
// Upon failure, respond with an error acknowledgement.
retP = watch(retP, this.facets.nackSender, { response });
}

// Log errors in the upcall handling.
retP = watch(retP, this.facets.logErrorWatcher, { obj });
// Always log errors.
retP = watch(retP, this.facets.errorLogger, { obj });

if (isActiveTap) {
// If the tap is active, return the promiseVow to the caller. This
// will delay the middleware until fulfilment of the retP chain.
return retP;
}

// Otherwise, passively return nothing.
// For an active tap, return a promise/vow to delay middleware
// until final settlement.
// For a passive tap, return undefined to skip such delays.
return isActiveTap ? retP : undefined;
},
},
/**
* `watch` callback for encoding the raw `ack` return value from an active
* `writeAcknowledgement` tap as base64, then sending down to the
* targetHost.
* A watcher for sending acknowledgements down to the host.
*/
encodeAckWatcher: {
onFulfilled(rawAck, { ackMethodData }) {
// Encode the tap's ack and write it out.
const ack = byteSourceToBase64(coerceToByteSource(rawAck));
ackMethodData = { ...ackMethodData, ack };
return E(this.state.targetHost).sendDowncall(ackMethodData);
ackSender: {
onFulfilled(rawAck, { response, isActiveTap }) {
if (isActiveTap) {
// Incorporate an active tap's replacement ack.
const ack = byteSourceToBase64(coerceToByteSource(rawAck));
response = { ...response, ack };
}
return E(this.state.targetHost).sendDowncall(response);
},
},

/**
* `watch` callback for handling errors in the sending of an ack and
* reifying it as an error acknowledgement.
* A watcher for sending error acknowledgements down to the host.
*/
sendErrorWatcher: {
onRejected(error, { ackMethodData }) {
nackSender: {
onRejected(error, { response }) {
console.error(`Error sending ack:`, error);
const rawAck = JSON.stringify({ error: error.message });
ackMethodData = { ...ackMethodData, ack: byteSourceToBase64(rawAck) };
return E(this.state.targetHost).sendDowncall(ackMethodData);
const nack = { ...response, ack: byteSourceToBase64(rawAck) };
return E(this.state.targetHost).sendDowncall(nack);
},
},
/** `watch` callback for logging errors in the upcall handling. */
logErrorWatcher: {

/**
* A watcher for logging errors.
*/
errorLogger: {
onRejected(error, { obj }) {
console.error(`Error in handling of`, obj, error);
// Don't propagate the error any further.
},
},
},
Expand Down

0 comments on commit 0960242

Please sign in to comment.