Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore]: Store Refactor #47

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions src/collector/layer-zero/layer-zero.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
UlnConfigStruct,
UlnConfigStructOutput,
} from 'src/contracts/ReceiveULN302';
import { AmbPayload } from 'src/store/types/store.types';
import { AMBMessage, AMBProof } from 'src/store/store.types';
import { LayerZeroEnpointV2__factory } from 'src/contracts';
import { Resolver, loadResolver } from 'src/resolvers/resolver';
import { ParsePayload } from 'src/payload/decode.payload';
Expand All @@ -47,6 +47,11 @@ interface LayerZeroWorkerDataWithMapping extends LayerZeroWorkerData {
layerZeroChainIdMap: Record<string, string>;
}

interface LayerZeroPayloadData {
messageIdentifier: string,
payload: string,
}

class LayerZeroWorker {
private readonly config: LayerZeroWorkerDataWithMapping;
private readonly chainId: string;
Expand All @@ -72,7 +77,7 @@ class LayerZeroWorker {
this.chainId = this.config.chainId;
this.layerZeroChainIdMap = this.config.layerZeroChainIdMap;
this.incentivesAddresses = this.config.incentivesAddresses;
this.store = new Store(this.chainId);
this.store = new Store();
this.provider = this.initializeProvider(this.config.rpc);
this.logger = this.initializeLogger(this.chainId);
this.receiveULN302 = ReceiveULN302__factory.connect(
Expand Down Expand Up @@ -390,37 +395,48 @@ class LayerZeroWorker {
);
const transactionBlockNumber =
await this.resolver.getTransactionBlockNumber(log.blockNumber);
await this.store.setAmb(
{
messageIdentifier: decodedMessage.messageIdentifier,
amb: 'layer-zero',
sourceChain: srcEidMapped.toString(),
destinationChain: dstEidMapped.toString(),
sourceEscrow: packet.sender,
payload: decodedMessage.message,
recoveryContext: '0x',
blockNumber: log.blockNumber,
transactionBlockNumber,
blockHash: log.blockHash,
transactionHash: log.transactionHash,
},
log.transactionHash,

const messageIdentifier = '0x' + decodedMessage.messageIdentifier;
const ambMessage: AMBMessage = {
messageIdentifier,

amb: 'layer-zero',
fromChainId: srcEidMapped.toString(),
toChainId: dstEidMapped.toString(),
fromIncentivesAddress: '0x' + packet.sender.slice(24), // Keep only the relevant bytes (i.e. discard the first 12 bytes)
// toIncentivesAddress: , //TODO

incentivesPayload: '0x' + packet.message,

transactionBlockNumber,

blockNumber: log.blockNumber,
blockHash: log.blockHash,
transactionHash: log.transactionHash,
}

await this.store.setAMBMessage(
this.chainId,
ambMessage,
);

const payloadHash = this.calculatePayloadHash(
packet.guid,
packet.message,
);

await this.store.setPayload('layer-zero', 'ambMessage', payloadHash, {
messageIdentifier: decodedMessage.messageIdentifier,
destinationChain: dstEidMapped,
payload: encodedPayload,
});
await this.store.setAdditionalAMBData<LayerZeroPayloadData>(
'layer-zero',
payloadHash.toLowerCase(),
{
messageIdentifier,
payload: encodedPayload
},
);

this.logger.info(
{
messageIdentifier: decodedMessage.messageIdentifier,
messageIdentifier,
transactionHash: log.transactionHash,
payloadHash
},
Expand Down Expand Up @@ -469,7 +485,10 @@ class LayerZeroWorker {
{ dvn, decodedHeader, confirmations, proofHash },
'PayloadVerified event decoded.',
);
const payloadData = await this.store.getPayload('layer-zero', 'ambMessage', proofHash);
const payloadData = await this.store.getAdditionalAMBData<LayerZeroPayloadData>(
'layer-zero',
proofHash.toLowerCase()
);
if (!payloadData) {
this.logger.error(
{ proofHash },
Expand All @@ -490,17 +509,20 @@ class LayerZeroWorker {
proofHash,
);
if (isVerifiable) {
const ambPayload: AmbPayload = {
messageIdentifier: '0x' + payloadData.messageIdentifier,
const ambProof: AMBProof = {
messageIdentifier: payloadData.messageIdentifier,

amb: 'layer-zero',
destinationChainId: dstEidMapped.toString(),
fromChainId: srcEidMapped.toString(),
toChainId: dstEidMapped.toString(),

message: payloadData.payload,
messageCtx: '0x',
};
this.logger.info({ proofHash }, `LayerZero proof found.`);
await this.store.submitProof(
await this.store.setAMBProof(
this.layerZeroChainIdMap[decodedHeader.dstEid]!,
ambPayload,
ambProof,
);
} else {
this.logger.debug('Payload could not be verified');
Expand Down
46 changes: 27 additions & 19 deletions src/collector/mock/mock.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import pino from 'pino';
import { convertHexToDecimal, tryErrorToString, wait } from 'src/common/utils';
import { IncentivizedMockEscrow__factory } from 'src/contracts';
import { Store } from 'src/store/store.lib';
import { AmbMessage, AmbPayload } from 'src/store/types/store.types';
import { AMBMessage, AMBProof } from 'src/store/store.types';
import { workerData, MessagePort } from 'worker_threads';
import {
decodeMockMessage,
Expand Down Expand Up @@ -65,7 +65,7 @@ class MockCollectorWorker {
// Get a connection to the redis store.
// The redis store has been wrapped into a lib to make it easier to standardise
// communication between the various components.
this.store = new Store(this.chainId);
this.store = new Store();

// Get an Ethers provider with which to collect the bounties information.
this.provider = this.initializeProvider(this.config.rpc);
Expand Down Expand Up @@ -320,25 +320,30 @@ class MockCollectorWorker {
log.blockNumber
);

const amb: AmbMessage = {
...decodedMessage,
const ambMessage: AMBMessage = {
messageIdentifier: decodedMessage.messageIdentifier,

amb: 'mock',
sourceEscrow: this.config.incentivesAddress,
blockNumber: log.blockNumber,
fromChainId: decodedMessage.sourceChain,
toChainId: decodedMessage.destinationChain,
fromIncentivesAddress: this.config.incentivesAddress,
toIncentivesAddress: messageEvent.recipient,

incentivesPayload: decodedMessage.payload,

transactionBlockNumber,

blockNumber: log.blockNumber,
blockHash: log.blockHash,
transactionHash: log.transactionHash
}

// Set the collect message on-chain. This is not the proof but the raw message.
// It can be used by plugins to facilitate other jobs.
await this.store.setAmb(amb, log.transactionHash);

// Set destination address for the bounty.
await this.store.registerDestinationAddress({
messageIdentifier: amb.messageIdentifier,
destinationAddress: messageEvent.recipient,
});
await this.store.setAMBMessage(
this.chainId,
ambMessage
);

// Encode and sign the message for delivery.
// This is the proof which enables us to submit the transaciton later.
Expand All @@ -348,27 +353,30 @@ class MockCollectorWorker {
const signature = this.signingKey.sign(keccak256(encodedMessage));
const executionContext = encodeSignature(signature);

const destinationChainId = convertHexToDecimal(amb.destinationChain);
const destinationChainId = convertHexToDecimal(ambMessage.toChainId);

// Construct the payload.
const ambPayload: AmbPayload = {
messageIdentifier: amb.messageIdentifier,
const ambPayload: AMBProof = {
messageIdentifier: ambMessage.messageIdentifier,

amb: 'mock',
destinationChainId,
fromChainId: this.chainId,
toChainId: destinationChainId,

message: encodedMessage,
messageCtx: executionContext, // If the generalised incentives implementation does not use the context set it to "0x".
};

this.logger.info(
{
messageIdentifier: amb.messageIdentifier,
messageIdentifier: ambMessage.messageIdentifier,
destinationChainId: destinationChainId,
},
`Mock message found.`,
);

// Submit the proofs to any listeners. If there is a submitter, it will process the proof and submit it.
await this.store.submitProof(destinationChainId, ambPayload);
await this.store.setAMBProof(destinationChainId, ambPayload);
}


Expand Down
26 changes: 15 additions & 11 deletions src/collector/polymer/polymer.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import pino from 'pino';
import { tryErrorToString, wait } from 'src/common/utils';
import { IbcEventEmitter__factory } from 'src/contracts';
import { Store } from 'src/store/store.lib';
import { AmbMessage } from 'src/store/types/store.types';
import { AMBMessage } from 'src/store/store.types';
import { workerData, MessagePort } from 'worker_threads';
import { PolymerWorkerData } from './polymer';
import { AbiCoder, JsonRpcProvider, Log, LogDescription, zeroPadValue } from 'ethers6';
Expand Down Expand Up @@ -40,7 +40,7 @@ class PolymerCollectorSnifferWorker {

this.chainId = this.config.chainId;

this.store = new Store(this.chainId);
this.store = new Store();
this.provider = this.initializeProvider(this.config.rpc);
this.logger = this.initializeLogger(this.chainId);
this.resolver = this.loadResolver(
Expand Down Expand Up @@ -295,26 +295,30 @@ class PolymerCollectorSnifferWorker {
log.blockNumber
);

const amb: AmbMessage = {
const ambMessage: AMBMessage = {
messageIdentifier,

amb: 'polymer',
sourceChain: this.chainId,
destinationChain,
sourceEscrow: event.sourcePortAddress,
payload: packet,
blockNumber: log.blockNumber,
fromChainId: this.chainId,
toChainId: destinationChain,
fromIncentivesAddress: event.sourcePortAddress,

incentivesPayload: packet,

transactionBlockNumber,

blockNumber: log.blockNumber,
blockHash: log.blockHash,
transactionHash: log.transactionHash
};
}

// Set the collect message on-chain. This is not the proof but the raw message.
// It can be used by plugins to facilitate other jobs.
await this.store.setAmb(amb, log.transactionHash);
await this.store.setAMBMessage(this.chainId, ambMessage);

this.logger.info(
{
messageIdentifier: amb.messageIdentifier,
messageIdentifier,
destinationChainId: destinationChain,
},
`Polymer message found.`,
Expand Down
51 changes: 23 additions & 28 deletions src/collector/wormhole/wormhole-engine.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import { decodeWormholeMessage } from 'src/collector/wormhole/wormhole.utils';
import { add0X } from 'src/common/utils';
import { workerData } from 'worker_threads';
import { Store } from 'src/store/store.lib';
import { AmbPayload } from 'src/store/types/store.types';
import { AMBProof } from 'src/store/store.types';
import pino, { LoggerOptions } from 'pino';
import {
WormholeChainConfig,
WormholeChainId,
WormholeRelayerEngineWorkerData,
} from './wormhole.types';
Expand All @@ -35,13 +34,13 @@ class WormholeEngineWorker {
private readonly config: WormholeRelayerEngineWorkerData;

private readonly logger: pino.Logger;
private readonly stores: Map<WormholeChainId, Store>;
private readonly store: Store;

constructor() {
this.config = workerData as WormholeRelayerEngineWorkerData;

this.logger = this.initializeLogger(this.config.loggerOptions);
this.stores = this.loadStores(this.config.wormholeChainConfigs);
this.store = new Store();
}

// Initialization helpers
Expand All @@ -67,17 +66,6 @@ class WormholeEngineWorker {
});
}

private loadStores(
wormholeChainConfig: Map<string, WormholeChainConfig>,
): Map<WormholeChainId, Store> {
const stores: Map<WormholeChainId, Store> = new Map();
for (const [chainId, wormholeConfig] of wormholeChainConfig) {
stores.set(wormholeConfig.wormholeChainId, new Store(chainId));
}

return stores;
}

private async loadWormholeRelayerEngine(): Promise<StandardRelayerApp<StandardRelayerContext>> {
const enviroment = this.config.isTestnet
? Environment.TESTNET
Expand Down Expand Up @@ -206,6 +194,20 @@ class WormholeEngineWorker {
add0X(vaa.payload.toString('hex')),
);

const sourceChainId = this.config.wormholeChainIdMap.get(
vaa.emitterChain,
);
if (sourceChainId == undefined) {
this.logger.warn(
{
vaa,
sourceWormholeChainId: vaa.emitterChain,
},
`Failed to process VAA: source chain id given Wormhole chain id not found.`,
);
return;
}

const destinationChainId = this.config.wormholeChainIdMap.get(
wormholeInfo.destinationWormholeChainId,
);
Expand All @@ -221,10 +223,13 @@ class WormholeEngineWorker {
return;
}

const ambPayload: AmbPayload = {
const ambProof: AMBProof = {
messageIdentifier: wormholeInfo.messageIdentifier,

amb: 'wormhole',
destinationChainId,
fromChainId: sourceChainId,
toChainId: destinationChainId,

message: add0X(vaa.bytes.toString('hex')),
messageCtx: '0x',
};
Expand All @@ -234,17 +239,7 @@ class WormholeEngineWorker {
`Wormhole VAA found.`,
);

const store = this.stores.get(vaa.emitterChain);
if (store != undefined) {
await store.submitProof(destinationChainId, ambPayload);
} else {
this.logger.warn(
{
wormholeVAAEmitterChain: vaa.emitterChain,
},
`No 'Store' found for the Wormhole VAA emitter chain id.`,
);
}
await this.store.setAMBProof(destinationChainId, ambProof);
}
}

Expand Down
Loading