Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(vtransfer): transfer interceptor updates #9470

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 44 additions & 41 deletions packages/vats/src/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ const prepareTransferInterceptor = (zone, vowTools) => {
'TransferInterceptorKit',
{
public: TargetAppI,
encodeAckWatcher: M.interface('EncodeAckWatcher', {
ackSender: M.interface('AckSender', {
onFulfilled: ReactionGuard,
}),
sendErrorWatcher: M.interface('SendErrorWatcher', {
nackSender: M.interface('NackSender', {
onRejected: ReactionGuard,
}),
logErrorWatcher: M.interface('LogErrorWatcher', {
errorLogger: M.interface('ErrorLogger', {
onRejected: ReactionGuard,
}),
},
Expand All @@ -55,7 +55,7 @@ const prepareTransferInterceptor = (zone, vowTools) => {
{
public: {
async receiveUpcall(obj) {
const { isActiveTap, tap, targetHost } = this.state;
const { isActiveTap, tap } = this.state;

obj.type === VTRANSFER_IBC_EVENT ||
Fail`Invalid upcall argument type ${obj.type}; expected ${bare(VTRANSFER_IBC_EVENT)}`;
Expand All @@ -67,64 +67,67 @@ const prepareTransferInterceptor = (zone, vowTools) => {

// See if the upcall result needs special handling.
if (obj.event === 'writeAcknowledgement') {
const ackMethodData = {
// Respond with the ack (or an active tap's replacement thereof).
const response = {
type: 'IBC_METHOD',
method: 'receiveExecuted',
packet: obj.packet,
ack: obj.acknowledgement,
};
if (isActiveTap) {
retP = watch(retP, this.facets.encodeAckWatcher, {
ackMethodData,
});
} else {
// This is a passive tap, so forward the ack without intervention.
ackMethodData.ack = obj.acknowledgement;
retP = watch(E(targetHost).sendDowncall(ackMethodData));
}
retP = watch(retP, this.facets.sendErrorWatcher, { ackMethodData });
const senderContext = { response, isActiveTap };
// An active tap must wait for the upcall result,
// but a passive tap can proceed immediately.
retP = isActiveTap
? watch(retP, this.facets.ackSender, senderContext)
: /** @type {any} */ (
E(this.facets.ackSender).onFulfilled(undefined, senderContext)
);
// Upon failure, respond with an error acknowledgement.
retP = watch(retP, this.facets.nackSender, { response });
}

// Log errors in the upcall handling.
retP = watch(retP, this.facets.logErrorWatcher, { obj });
// Always log errors.
retP = watch(retP, this.facets.errorLogger, { obj });

if (isActiveTap) {
// If the tap is active, return the promiseVow to the caller. This
// will delay the middleware until fulfilment of the retP chain.
return retP;
}

// Otherwise, passively return nothing.
// For an active tap, return a promise/vow to delay middleware
// until final settlement.
// For a passive tap, return undefined to skip such delays.
return isActiveTap ? retP : undefined;
},
},
/**
* `watch` callback for encoding the raw `ack` return value from an active
* `writeAcknowledgement` tap as base64, then sending down to the
* targetHost.
* A watcher for sending acknowledgements down to the host.
*/
encodeAckWatcher: {
onFulfilled(rawAck, { ackMethodData }) {
// Encode the tap's ack and write it out.
const ack = byteSourceToBase64(coerceToByteSource(rawAck));
ackMethodData = { ...ackMethodData, ack };
return E(this.state.targetHost).sendDowncall(ackMethodData);
ackSender: {
onFulfilled(rawAck, { response, isActiveTap }) {
if (isActiveTap) {
// Incorporate an active tap's replacement ack.
const ack = byteSourceToBase64(coerceToByteSource(rawAck));
response = { ...response, ack };
}
return E(this.state.targetHost).sendDowncall(response);
},
},

/**
* `watch` callback for handling errors in the sending of an ack and
* reifying it as an error acknowledgement.
* A watcher for sending error acknowledgements down to the host.
*/
sendErrorWatcher: {
onRejected(error, { ackMethodData }) {
nackSender: {
onRejected(error, { response }) {
console.error(`Error sending ack:`, error);
const rawAck = JSON.stringify({ error: error.message });
ackMethodData = { ...ackMethodData, ack: byteSourceToBase64(rawAck) };
return E(this.state.targetHost).sendDowncall(ackMethodData);
const nack = { ...response, ack: byteSourceToBase64(rawAck) };
return E(this.state.targetHost).sendDowncall(nack);
},
},
/** `watch` callback for logging errors in the upcall handling. */
logErrorWatcher: {

/**
* A watcher for logging errors.
*/
errorLogger: {
onRejected(error, { obj }) {
console.error(`Error in handling of`, obj, error);
// Don't propagate the error any further.
},
},
},
Expand Down
Loading