From 401924c81f607b318c54c0d26049fc6d12867de8 Mon Sep 17 00:00:00 2001 From: swimricky Date: Mon, 13 Nov 2023 11:28:54 -0500 Subject: [PATCH] feat: remove delay, fetch as fast as possible --- frontend/claim_sdk/eventSubscriber.ts | 98 +++++--------------- frontend/integration/integrationTest.test.ts | 1 - frontend/scripts/datadog.ts | 37 -------- 3 files changed, 21 insertions(+), 115 deletions(-) diff --git a/frontend/claim_sdk/eventSubscriber.ts b/frontend/claim_sdk/eventSubscriber.ts index df7d7d99..2b989a4d 100644 --- a/frontend/claim_sdk/eventSubscriber.ts +++ b/frontend/claim_sdk/eventSubscriber.ts @@ -10,14 +10,12 @@ export class TokenDispenserEventSubscriber { programId: anchor.web3.PublicKey timeWindowSecs: number chunkSize: number - delay: number constructor( endpoint: string, programId: anchor.web3.PublicKey, timeWindowSecs: number, chunkSize: number, - delay: number, confirmOpts?: anchor.web3.ConfirmOptions ) { const coder = new BorshCoder(tokenDispenser as Idl) @@ -25,7 +23,6 @@ export class TokenDispenserEventSubscriber { this.eventParser = new anchor.EventParser(this.programId, coder) this.timeWindowSecs = timeWindowSecs this.chunkSize = chunkSize - this.delay = delay confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions() if ( !confirmOpts.commitment || @@ -54,7 +51,6 @@ export class TokenDispenserEventSubscriber { this.connection.commitment as anchor.web3.Finality ) let batchWithinWindow = true - let batchCount = 0 while (currentBatch.length > 0 && batchWithinWindow) { const currentBatchLastSig = currentBatch[currentBatch.length - 1]?.signature @@ -76,10 +72,6 @@ export class TokenDispenserEventSubscriber { }, this.connection.commitment as anchor.web3.Finality ) - batchCount++ - if (batchCount % 10 === 0) { - await sleep(this.delay) - } } const validTxnSigs = [] @@ -92,32 +84,15 @@ export class TokenDispenserEventSubscriber { } } const validTxnSigChunks = chunkArray(validTxnSigs, this.chunkSize) - let validTxns: Array<{ - signature: string - logs: string[] - blockTime: number - slot: number - }> = [] - for (let i = 0; i < validTxnSigChunks.length; i++) { - const validTxnSigChunk = validTxnSigChunks[i] - const validTxnsChunk = ( - await this.connection.getTransactions(validTxnSigChunk, { - commitment: this.connection.commitment as anchor.web3.Finality, - maxSupportedTransactionVersion: 0, - }) - ).map((txLog) => { - return { - signature: txLog?.transaction.signatures[0] ?? '', - logs: txLog?.meta?.logMessages ?? [], - blockTime: txLog?.blockTime ?? 0, - slot: txLog?.slot ?? 0, - } - }) - validTxns.push(...validTxnsChunk) - if (i % 10 === 0) { - await sleep(this.delay) + + const validTxns = (await this.fetchTxns(validTxnSigChunks)).map((txn) => { + return { + signature: txn?.transaction.signatures[0] ?? '', + logs: txn?.meta?.logMessages ?? [], + blockTime: txn?.blockTime ?? 0, + slot: txn?.slot ?? 0, } - } + }) const txnEvents = validTxns.map((txnLog) => { const eventGen = this.eventParser.parseLogs(txnLog.logs) @@ -140,32 +115,14 @@ export class TokenDispenserEventSubscriber { }) const errorTxnSigChunks = chunkArray(errorTxnSigs, this.chunkSize) - let errorTxns: Array<{ - signature: string - blockTime: number - slot: number - }> = [] - - for (let i = 0; i < errorTxnSigChunks.length; i++) { - const errorTxnSigChunk = errorTxnSigChunks[i] - const errorTxnsChunk = ( - await this.connection.getTransactions(errorTxnSigChunk, { - commitment: this.connection.commitment as anchor.web3.Finality, - maxSupportedTransactionVersion: 0, - }) - ).map((txLog) => { - return { - signature: txLog?.transaction.signatures[0] ?? '', - blockTime: txLog?.blockTime ?? 0, - slot: txLog?.slot ?? 0, - } - }) - errorTxns.push(...errorTxnsChunk) - if (i % 10 === 0) { - await sleep(this.delay) + const errorTxns = (await this.fetchTxns(errorTxnSigChunks)).map((txn) => { + return { + signature: txn?.transaction.signatures[0] ?? '', + blockTime: txn?.blockTime ?? 0, + slot: txn?.slot ?? 0, } - } + }) return { txnEvents, @@ -184,26 +141,13 @@ export class TokenDispenserEventSubscriber { return txn?.blockTime } - private async fetchTxnsSlow(txnSigChunks: any[][]) { - let txns: anchor.web3.VersionedTransactionResponse[] = [] - for (let i = 0; i < txnSigChunks.length; i++) { - const txnSigChunk = txnSigChunks[i] - const txnsChunk = await this.connection.getTransactions(txnSigChunk, { - commitment: this.connection.commitment as anchor.web3.Finality, - maxSupportedTransactionVersion: 0, - }) - txnsChunk.forEach((txLog) => { - if (txLog !== null) { - txns.push(txLog) - } - }) - if (i % 10 === 0) { - await sleep(this.delay) - } - } - } - - private async fetchTxnsFast(txnSigChunks: any[][]) { + /** + * This fetches all the txns by sending each chunk asynchronously as fast as possible. + * Assumes that RPC node we're using will not rate-limit. + * @param txnSigChunks + * @private + */ + private async fetchTxns(txnSigChunks: any[][]) { let txns: anchor.web3.VersionedTransactionResponse[] = [] await Promise.all( txnSigChunks.map(async (txnSigChunk) => { diff --git a/frontend/integration/integrationTest.test.ts b/frontend/integration/integrationTest.test.ts index e586b4fc..722dd105 100644 --- a/frontend/integration/integrationTest.test.ts +++ b/frontend/integration/integrationTest.test.ts @@ -107,7 +107,6 @@ describe('integration test', () => { tokenDispenserPid, tenMinTimeWindow, 50, - 1, // not worried about rate limit when testing locally confirmOpts ) diff --git a/frontend/scripts/datadog.ts b/frontend/scripts/datadog.ts index 798dde78..3f150ed8 100644 --- a/frontend/scripts/datadog.ts +++ b/frontend/scripts/datadog.ts @@ -42,43 +42,6 @@ async function main() { const configuration = client.createConfiguration() const apiInstance = new v1.EventsApi(configuration) - // # An example is: PriceFeedOfflineCheck-Crypto.AAVE/USD - // aggregation_key = f"{self.check.__class__.__name__}-{self.check.state().symbol}" - // - // if self.check.__class__.__bases__ == (PublisherCheck,): - // # Add publisher key to the aggregation key to separate different faulty publishers - // # An example would be: PublisherPriceCheck-Crypto.AAVE/USD-9TvAYCUkGajRXs.... - // aggregation_key += "-" + self.check.state().public_key.key - // - // event = DatadogAPIEvent( - // aggregation_key=aggregation_key, - // title=text.split("\n")[0], - // text=text, - // tags=[ - // "service:observer", - // f"network:{self.context['network']}", - // f"symbol:{self.check.state().symbol}", - // f"check:{self.check.__class__.__name__}", - // ], - // alert_type=EventAlertType.WARNING, - // source_type_name="my_apps", - // ) - // 74000000000 - // const { - // claimant, - // claimAmount, - // ecosystem, - // address, - // } = event; - // example datadog event - // title: Crypto.GRAIL/USD is too far at the price service. - // aggregation_key: PriceFeedCrossChainDeviationCheck-Crypto.GRAIL/USD - // text: - // Crypto.GRAIL/USD is too far at the price service. - // - // Price: 1876.14745716 - // Price at price service: 1618.41120956 - const txnEventRequests = createTxnEventRequest(txnEvents) await Promise.all( txnEventRequests.map((txnEventRequest) => {