Skip to content

Commit

Permalink
feat: remove delay, fetch as fast as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
swimricky committed Nov 13, 2023
1 parent f0d7f31 commit 401924c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 115 deletions.
98 changes: 21 additions & 77 deletions frontend/claim_sdk/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,19 @@ export class TokenDispenserEventSubscriber {
programId: anchor.web3.PublicKey
timeWindowSecs: number
chunkSize: number
delay: number

constructor(
endpoint: string,
programId: anchor.web3.PublicKey,
timeWindowSecs: number,
chunkSize: number,
delay: number,
confirmOpts?: anchor.web3.ConfirmOptions
) {
const coder = new BorshCoder(tokenDispenser as Idl)
this.programId = programId
this.eventParser = new anchor.EventParser(this.programId, coder)
this.timeWindowSecs = timeWindowSecs
this.chunkSize = chunkSize
this.delay = delay
confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions()
if (
!confirmOpts.commitment ||
Expand Down Expand Up @@ -54,7 +51,6 @@ export class TokenDispenserEventSubscriber {
this.connection.commitment as anchor.web3.Finality
)
let batchWithinWindow = true
let batchCount = 0
while (currentBatch.length > 0 && batchWithinWindow) {
const currentBatchLastSig =
currentBatch[currentBatch.length - 1]?.signature
Expand All @@ -76,10 +72,6 @@ export class TokenDispenserEventSubscriber {
},
this.connection.commitment as anchor.web3.Finality
)
batchCount++
if (batchCount % 10 === 0) {
await sleep(this.delay)
}
}

const validTxnSigs = []
Expand All @@ -92,32 +84,15 @@ export class TokenDispenserEventSubscriber {
}
}
const validTxnSigChunks = chunkArray(validTxnSigs, this.chunkSize)
let validTxns: Array<{
signature: string
logs: string[]
blockTime: number
slot: number
}> = []
for (let i = 0; i < validTxnSigChunks.length; i++) {
const validTxnSigChunk = validTxnSigChunks[i]
const validTxnsChunk = (
await this.connection.getTransactions(validTxnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
).map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
logs: txLog?.meta?.logMessages ?? [],
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})
validTxns.push(...validTxnsChunk)
if (i % 10 === 0) {
await sleep(this.delay)

const validTxns = (await this.fetchTxns(validTxnSigChunks)).map((txn) => {
return {
signature: txn?.transaction.signatures[0] ?? '',
logs: txn?.meta?.logMessages ?? [],
blockTime: txn?.blockTime ?? 0,
slot: txn?.slot ?? 0,
}
}
})

const txnEvents = validTxns.map((txnLog) => {
const eventGen = this.eventParser.parseLogs(txnLog.logs)
Expand All @@ -140,32 +115,14 @@ export class TokenDispenserEventSubscriber {
})

const errorTxnSigChunks = chunkArray(errorTxnSigs, this.chunkSize)
let errorTxns: Array<{
signature: string
blockTime: number
slot: number
}> = []

for (let i = 0; i < errorTxnSigChunks.length; i++) {
const errorTxnSigChunk = errorTxnSigChunks[i]
const errorTxnsChunk = (
await this.connection.getTransactions(errorTxnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
).map((txLog) => {
return {
signature: txLog?.transaction.signatures[0] ?? '',
blockTime: txLog?.blockTime ?? 0,
slot: txLog?.slot ?? 0,
}
})

errorTxns.push(...errorTxnsChunk)
if (i % 10 === 0) {
await sleep(this.delay)
const errorTxns = (await this.fetchTxns(errorTxnSigChunks)).map((txn) => {
return {
signature: txn?.transaction.signatures[0] ?? '',
blockTime: txn?.blockTime ?? 0,
slot: txn?.slot ?? 0,
}
}
})

return {
txnEvents,
Expand All @@ -184,26 +141,13 @@ export class TokenDispenserEventSubscriber {
return txn?.blockTime
}

private async fetchTxnsSlow(txnSigChunks: any[][]) {
let txns: anchor.web3.VersionedTransactionResponse[] = []
for (let i = 0; i < txnSigChunks.length; i++) {
const txnSigChunk = txnSigChunks[i]
const txnsChunk = await this.connection.getTransactions(txnSigChunk, {
commitment: this.connection.commitment as anchor.web3.Finality,
maxSupportedTransactionVersion: 0,
})
txnsChunk.forEach((txLog) => {
if (txLog !== null) {
txns.push(txLog)
}
})
if (i % 10 === 0) {
await sleep(this.delay)
}
}
}

private async fetchTxnsFast(txnSigChunks: any[][]) {
/**
* This fetches all the txns by sending each chunk asynchronously as fast as possible.
* Assumes that RPC node we're using will not rate-limit.
* @param txnSigChunks
* @private
*/
private async fetchTxns(txnSigChunks: any[][]) {
let txns: anchor.web3.VersionedTransactionResponse[] = []
await Promise.all(
txnSigChunks.map(async (txnSigChunk) => {
Expand Down
1 change: 0 additions & 1 deletion frontend/integration/integrationTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ describe('integration test', () => {
tokenDispenserPid,
tenMinTimeWindow,
50,
1, // not worried about rate limit when testing locally
confirmOpts
)

Expand Down
37 changes: 0 additions & 37 deletions frontend/scripts/datadog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,6 @@ async function main() {
const configuration = client.createConfiguration()
const apiInstance = new v1.EventsApi(configuration)

// # An example is: PriceFeedOfflineCheck-Crypto.AAVE/USD
// aggregation_key = f"{self.check.__class__.__name__}-{self.check.state().symbol}"
//
// if self.check.__class__.__bases__ == (PublisherCheck,):
// # Add publisher key to the aggregation key to separate different faulty publishers
// # An example would be: PublisherPriceCheck-Crypto.AAVE/USD-9TvAYCUkGajRXs....
// aggregation_key += "-" + self.check.state().public_key.key
//
// event = DatadogAPIEvent(
// aggregation_key=aggregation_key,
// title=text.split("\n")[0],
// text=text,
// tags=[
// "service:observer",
// f"network:{self.context['network']}",
// f"symbol:{self.check.state().symbol}",
// f"check:{self.check.__class__.__name__}",
// ],
// alert_type=EventAlertType.WARNING,
// source_type_name="my_apps",
// )
// 74000000000
// const {
// claimant,
// claimAmount,
// ecosystem,
// address,
// } = event;
// example datadog event
// title: Crypto.GRAIL/USD is too far at the price service.
// aggregation_key: PriceFeedCrossChainDeviationCheck-Crypto.GRAIL/USD
// text:
// Crypto.GRAIL/USD is too far at the price service.
//
// Price: 1876.14745716
// Price at price service: 1618.41120956

const txnEventRequests = createTxnEventRequest(txnEvents)
await Promise.all(
txnEventRequests.map((txnEventRequest) => {
Expand Down

0 comments on commit 401924c

Please sign in to comment.