Skip to content

Commit

Permalink
feat(common & relayer): adds check to prevent relaying messages alrea…
Browse files Browse the repository at this point in the history
…dy relayed
  • Loading branch information
allemanfredi committed May 16, 2024
1 parent e2f46eb commit 6f5bf9e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
8 changes: 5 additions & 3 deletions packages/common/src/Batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ interface BatcherConfigs {
minBatchSize: number
createBatchIntervalTimeMs: number
onBatch: (batch: any[]) => Promise<any>
onGetValues: () => Promise<any[]>
onResult?: (result: any) => Promise<any[]>
onGetValues: () => Promise<any>
onResult?: (result: any) => Promise<any>
}

class Batcher {
Expand Down Expand Up @@ -39,7 +39,9 @@ class Batcher {
private async _createBatch() {
try {
const values = await this.onGetValues()
this.logger.info(`Current batch size: ${values.length} missing: ${values.length > this.minBatchSize ? 0 : this.minBatchSize - values.length}`)
this.logger.info(
`Current batch size: ${values.length} missing: ${values.length > this.minBatchSize ? 0 : this.minBatchSize - values.length}`,
)
if (values.length >= this.minBatchSize) {
this.logger.info(`Batch found. Processing it ...`)
const result = await this.onBatch(values)
Expand Down
50 changes: 38 additions & 12 deletions packages/relayer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,51 @@ const batcher = new Batcher({
..._val.data,
}),
)
const serializedMessages = messages.map((_message: Message) => _message.serialize())

logger.child({ service: "Relayer" }).info(`Relaying ${serializedMessages.length} messages ...`)
const { request } = await client.simulateContract({
address: process.env.YAHO_ADDRESS as `0x${string}`,
abi: yahoAbi,
functionName: "relayMessagesToAdapters",
args: [serializedMessages],
})
const transactionHash = await client.writeContract(request)
logger.child({ service: "Relayer" }).info(`${serializedMessages.length} messages relayed: ${transactionHash}`)
const executableMessages = []
for (const message of messages) {
const hash = await client.readContract({
address: process.env.YAHO_ADDRESS as `0x${string}`,
abi: yahoAbi,
functionName: "getPendingMessageHash",
args: [message.id],
})
if (hash != "0x0000000000000000000000000000000000000000000000000000000000000000") {
executableMessages.push(message)
} else {
logger.child({ service: "Relayer" }).info(`${message.id} already relayed. Filtering it ...`)
await db.collection("relayedMessages").updateOne(
{ id: message.id },
{
$set: { status: "alreadyRelayed" },
},
)
}
}

let transactionHash = null
if (executableMessages.length) {
const serializedMessages = executableMessages.map((_message: Message) => _message.serialize())

logger.child({ service: "Relayer" }).info(`Relaying ${serializedMessages.length} messages ...`)
const { request } = await client.simulateContract({
address: process.env.YAHO_ADDRESS as `0x${string}`,
abi: yahoAbi,
functionName: "relayMessagesToAdapters",
args: [serializedMessages],
})
transactionHash = await client.writeContract(request)
logger.child({ service: "Relayer" }).info(`${serializedMessages.length} messages relayed: ${transactionHash}`)
}

return {
messages,
messages: executableMessages,
transactionHash,
}
},
onResult: (_result: any) => {
onResult: async (_result: any) => {
const { messages, transactionHash } = _result as { messages: Message[]; transactionHash: string }
if (!messages.length) return null

return Promise.all(
messages.map(({ id }) =>
Expand Down

0 comments on commit 6f5bf9e

Please sign in to comment.