Skip to content

Commit

Permalink
feat: add fast & slow txn fetching methods
Browse files Browse the repository at this point in the history
  • Loading branch information
swimricky committed Nov 10, 2023
1 parent 2f5b335 commit f0d7f31
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 35 deletions.
124 changes: 89 additions & 35 deletions frontend/claim_sdk/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ export class TokenDispenserEventSubscriber {
programId: anchor.web3.PublicKey
timeWindowSecs: number
chunkSize: number
delay: number

constructor(
endpoint: string,
programId: anchor.web3.PublicKey,
timeWindowSecs: number,
chunkSize: number,
delay: number,
confirmOpts?: anchor.web3.ConfirmOptions
) {
const coder = new BorshCoder(tokenDispenser as Idl)
this.programId = programId
this.eventParser = new anchor.EventParser(this.programId, coder)
this.timeWindowSecs = timeWindowSecs
this.chunkSize = chunkSize
this.delay = delay
confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions()
if (
!confirmOpts.commitment ||
Expand Down Expand Up @@ -51,6 +54,7 @@ export class TokenDispenserEventSubscriber {
this.connection.commitment as anchor.web3.Finality
)
let batchWithinWindow = true
let batchCount = 0
while (currentBatch.length > 0 && batchWithinWindow) {
const currentBatchLastSig =
currentBatch[currentBatch.length - 1]?.signature
Expand All @@ -72,10 +76,13 @@ export class TokenDispenserEventSubscriber {
},
this.connection.commitment as anchor.web3.Finality
)
batchCount++
if (batchCount % 10 === 0) {
await sleep(this.delay)
}
}

const validTxnSigs = []
// TODO: figure out what to do with error txns
const errorTxnSigs = []
for (const signature of signatures) {
if (signature.err) {
Expand All @@ -85,31 +92,34 @@ export class TokenDispenserEventSubscriber {
}
}
const validTxnSigChunks = chunkArray(validTxnSigs, this.chunkSize)
let txnLogs: Array<{
let validTxns: Array<{
signature: string
logs: string[]
blockTime: number
slot: number
}> = []
await Promise.all(
validTxnSigChunks.map(async (validTxnSigChunk, i) => {
const txns = await this.connection.getTransactions(validTxnSigChunk, {
for (let i = 0; i < validTxnSigChunks.length; i++) {
const validTxnSigChunk = validTxnSigChunks[i]
const validTxnsChunk = (
await this.connection.getTransactions(validTxnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
const txnLogsChunk = txns.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
logs: txLog?.meta?.logMessages ?? [],
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
txnLogs.push(...txnLogsChunk)
).map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
logs: txLog?.meta?.logMessages ?? [],
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
)
validTxns.push(...validTxnsChunk)
if (i % 10 === 0) {
await sleep(this.delay)
}
}

const txnEvents = txnLogs.map((txnLog) => {
const txnEvents = validTxns.map((txnLog) => {
const eventGen = this.eventParser.parseLogs(txnLog.logs)
const events = []
let event = eventGen.next()
Expand All @@ -130,34 +140,36 @@ export class TokenDispenserEventSubscriber {
})

const errorTxnSigChunks = chunkArray(errorTxnSigs, this.chunkSize)
let errorLogs: Array<{
let errorTxns: Array<{
signature: string
blockTime: number
slot: number
}> = []
await Promise.all(
errorTxnSigChunks.map(async (errorTxnSigChunk, i) => {
const errorTxns = await this.connection.getTransactions(
errorTxnSigChunk,
{
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
}
)
const errorTxnLogsChunk = errorTxns.map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}

for (let i = 0; i < errorTxnSigChunks.length; i++) {
const errorTxnSigChunk = errorTxnSigChunks[i]
const errorTxnsChunk = (
await this.connection.getTransactions(errorTxnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
errorLogs.push(...errorTxnLogsChunk)
).map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
)

errorTxns.push(...errorTxnsChunk)
if (i % 10 === 0) {
await sleep(this.delay)
}
}

return {
txnEvents,
errorLogs,
errorLogs: errorTxns,
}
}

Expand All @@ -171,6 +183,43 @@ export class TokenDispenserEventSubscriber {
// blockTime in unix timestamp (seconds)
return txn?.blockTime
}

private async fetchTxnsSlow(txnSigChunks: any[][]) {
let txns: anchor.web3.VersionedTransactionResponse[] = []
for (let i = 0; i < txnSigChunks.length; i++) {
const txnSigChunk = txnSigChunks[i]
const txnsChunk = await this.connection.getTransactions(txnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
txnsChunk.forEach((txLog) => {
if (txLog !== null) {
txns.push(txLog)
}
})
if (i % 10 === 0) {
await sleep(this.delay)
}
}
}

private async fetchTxnsFast(txnSigChunks: any[][]) {
let txns: anchor.web3.VersionedTransactionResponse[] = []
await Promise.all(
txnSigChunks.map(async (txnSigChunk) => {
const txnsChunk = await this.connection.getTransactions(txnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
txnsChunk.forEach((txLog) => {
if (txLog !== null) {
txns.push(txLog)
}
})
})
)
return txns
}
}

/**
Expand Down Expand Up @@ -213,6 +262,11 @@ function chunkArray(array: any[], chunkSize: number) {
array.slice(i * chunkSize, i * chunkSize + chunkSize)
)
}

async function sleep(delay: number) {
return new Promise((resolve) => setTimeout(resolve, delay))
}

export type TxnInfo = {
signature: string
blockTime: number
Expand Down
1 change: 1 addition & 0 deletions frontend/integration/integrationTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ describe('integration test', () => {
tokenDispenserPid,
tenMinTimeWindow,
50,
1, // not worried about rate limit when testing locally
confirmOpts
)

Expand Down

0 comments on commit f0d7f31

Please sign in to comment.