Skip to content

Commit

Permalink
feat: add chunking when fetching txns
Browse files Browse the repository at this point in the history
  • Loading branch information
swimricky committed Nov 10, 2023
1 parent 85a0ca4 commit 2f5b335
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 38 deletions.
88 changes: 61 additions & 27 deletions frontend/claim_sdk/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ export class TokenDispenserEventSubscriber {
connection: anchor.web3.Connection
programId: anchor.web3.PublicKey
timeWindowSecs: number
chunkSize: number

constructor(
endpoint: string,
programId: anchor.web3.PublicKey,
timeWindowSecs: number,
chunkSize: number,
confirmOpts?: anchor.web3.ConfirmOptions
) {
const coder = new BorshCoder(tokenDispenser as Idl)
this.programId = programId
this.eventParser = new anchor.EventParser(this.programId, coder)
this.timeWindowSecs = timeWindowSecs
this.chunkSize = chunkSize
confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions()
if (
!confirmOpts.commitment ||
Expand Down Expand Up @@ -71,28 +74,40 @@ export class TokenDispenserEventSubscriber {
)
}

const validTxns = []
const validTxnSigs = []
// TODO: figure out what to do with error txns
const errorTxns = []
const errorTxnSigs = []
for (const signature of signatures) {
if (signature.err) {
errorTxns.push(signature.signature)
errorTxnSigs.push(signature.signature)
} else {
validTxns.push(signature.signature)
validTxnSigs.push(signature.signature)
}
}
const txns = await this.connection.getTransactions(validTxns, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
const txnLogs = txns.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
logs: txLog?.meta?.logMessages ?? [],
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
const validTxnSigChunks = chunkArray(validTxnSigs, this.chunkSize)
let txnLogs: Array<{
signature: string
logs: string[]
blockTime: number
slot: number
}> = []
await Promise.all(
validTxnSigChunks.map(async (validTxnSigChunk, i) => {
const txns = await this.connection.getTransactions(validTxnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
const txnLogsChunk = txns.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
logs: txLog?.meta?.logMessages ?? [],
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
txnLogs.push(...txnLogsChunk)
})
)

const txnEvents = txnLogs.map((txnLog) => {
const eventGen = this.eventParser.parseLogs(txnLog.logs)
Expand All @@ -114,17 +129,31 @@ export class TokenDispenserEventSubscriber {
}
})

const errorTxnInfo = await this.connection.getTransactions(errorTxns, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
const errorLogs: TxnInfo[] = errorTxnInfo.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
const errorTxnSigChunks = chunkArray(errorTxnSigs, this.chunkSize)
let errorLogs: Array<{
signature: string
blockTime: number
slot: number
}> = []
await Promise.all(
errorTxnSigChunks.map(async (errorTxnSigChunk, i) => {
const errorTxns = await this.connection.getTransactions(
errorTxnSigChunk,
{
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
}
)
const errorTxnLogsChunk = errorTxns.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
errorLogs.push(...errorTxnLogsChunk)
})
)

return {
txnEvents,
Expand Down Expand Up @@ -179,6 +208,11 @@ export function formatTxnEventInfo(txnEvnInfo: TxnEventInfo) {
return formattedEvent
}

function chunkArray(array: any[], chunkSize: number) {
return Array.from({ length: Math.ceil(array.length / chunkSize) }, (_, i) =>
array.slice(i * chunkSize, i * chunkSize + chunkSize)
)
}
export type TxnInfo = {
signature: string
blockTime: number
Expand Down
6 changes: 3 additions & 3 deletions frontend/integration/integrationTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ describe('integration test', () => {
endpoint,
tokenDispenserPid,
tenMinTimeWindow,
50,
confirmOpts
)

Expand Down Expand Up @@ -271,15 +272,14 @@ describe('integration test', () => {
expect(txnEvents[0].event).toBeDefined()
const cosmClaimEvent = txnEvents[0].event!
expect(cosmClaimEvent.claimant.equals(wallet.publicKey)).toBeTruthy()
expect(cosmClaimEvent.ecosystem).toEqual('cosmwasm')
expect(cosmClaimEvent.address).toEqual(testWallets.cosmwasm[0].address())
expect(
new anchor.BN(cosmClaimEvent.claimAmount.toString()).eq(
claimInfo.amount
)
).toBeTruthy()
expectedTreasuryBalance = expectedTreasuryBalance.sub(claimInfo.amount)
const eventRemainingBalance = new anchor.BN(
cosmClaimEvent.remainingBalance.toString()
)
expect(
new anchor.BN(cosmClaimEvent.remainingBalance.toString()).eq(
expectedTreasuryBalance
Expand Down
17 changes: 9 additions & 8 deletions frontend/scripts/datadog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ import {
} from '../claim_sdk/eventSubscriber'
import * as anchor from '@coral-xyz/anchor'
import {
ERROR,
INFO,
WARNING,
} from '@datadog/datadog-api-client/dist/packages/datadog-api-client-v1/models/EventAlertType'
import { envOrErr } from '../claim_sdk'

const ENDPOINT = envOrErr('ENDPOINT')
const PROGRAM_ID = envOrErr('PROGRAM_ID')
const TIME_WINDOW_SECS = envOrErr('TIME_WINDOW_SECS')
const CLUSTER = envOrErr('CLUSTER')
const TIME_WINDOW_SECS = Number.parseInt(envOrErr('TIME_WINDOW_SECS'), 10)
const CHUNK_SIZE = Number.parseInt(envOrErr('CHUNK_SIZE'), 10)

async function main() {
const tokenDispenserEventSubscriber = new TokenDispenserEventSubscriber(
ENDPOINT,
new anchor.web3.PublicKey(PROGRAM_ID),
Number.parseInt(TIME_WINDOW_SECS, 10),
TIME_WINDOW_SECS,
CHUNK_SIZE,
{
commitment: 'confirmed',
}
Expand Down Expand Up @@ -120,8 +123,7 @@ function createTxnEventRequest(
tags: [
`claimant:${claimant}`,
`ecosystem:${ecosystem}`,
//TODO: add cluster name to the tag
`network:solana-mainnet`,
`network:${CLUSTER}`,
`service:token-dispenser-event-subscriber`,
],
},
Expand All @@ -137,10 +139,9 @@ function createErrorLogRequest(
body: {
title: `error-${errorLog.signature}`,
text: JSON.stringify(errorLog),
alertType: WARNING,
alertType: ERROR,
tags: [
//TODO: add cluster name to the tag
`network:solana-mainnet`,
`network:${CLUSTER}`,
`service:token-dispenser-event-subscriber`,
],
},
Expand Down

0 comments on commit 2f5b335

Please sign in to comment.