From 0b96bdbbb04b339eb821a61524b1759a4779fbcf Mon Sep 17 00:00:00 2001 From: Lyudmil Danailov Date: Wed, 6 Dec 2023 09:43:50 +0200 Subject: [PATCH 1/2] Different approach to exporter workflow --- bin/export_env_vars.sh | 2 +- blockchains/eth/eth_worker.js | 118 +++++++++--------- blockchains/eth/lib/constants.js | 4 +- blockchains/eth/lib/genesis_transfers.js | 2 +- .../eth/lib/next_interval_calculator.js | 65 ++++------ index.js | 64 ++++++++-- lib/constants.js | 4 + package-lock.json | 41 ++++++ package.json | 1 + test/eth/fetch_events.spec.js | 66 +++++----- test/eth/worker.spec.js | 105 ++++++++-------- 11 files changed, 270 insertions(+), 202 deletions(-) diff --git a/bin/export_env_vars.sh b/bin/export_env_vars.sh index c3b990e5..986e91bf 100755 --- a/bin/export_env_vars.sh +++ b/bin/export_env_vars.sh @@ -11,7 +11,7 @@ export KAFKA_TOPIC="erc20_exporter_test_topic" # 'kubectl -n cardano port-forward cardano-graphql-pod-id --address 172.17.0.1 3100:3100' # replacing with the actual pod id. The IP on which the container can access the host is # usually 172.17.0.1 -export CARDANO_GRAPHQL_URL=http://172.17.0.1:3100/graphql +export CARDANO_GRAPHQL_URL=https://cardano.santiment.net export BNB_CHAIN_START_MSEC=1595549200002 export ZOOKEEPER_SESSION_TIMEOUT=20000 export CONTRACT_MAPPING_FILE_PATH="./test/erc20/contract_mapping/contract_mapping.json" diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index 54d84b92..66afadc8 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -1,17 +1,17 @@ const Web3 = require('web3'); const jayson = require('jayson/promise'); -const { filterErrors } = require('./lib/filter_errors'); const constants = require('./lib/constants'); const { logger } = require('../../lib/logger'); -const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack'); -const { getGenesisTransfers } = require('./lib/genesis_transfers'); -const { transactionOrder, stableSort } = require('./lib/util'); -const BaseWorker = require('../../lib/worker_base'); const Web3Wrapper = require('./lib/web3_wrapper'); -const { decodeTransferTrace } = require('./lib/decode_transfers'); +const BaseWorker = require('../../lib/worker_base'); const { FeesDecoder } = require('./lib/fees_decoder'); -const { nextIntervalCalculator } = require('./lib/next_interval_calculator'); +const { filterErrors } = require('./lib/filter_errors'); +const { decodeTransferTrace } = require('./lib/decode_transfers'); +const { getGenesisTransfers } = require('./lib/genesis_transfers'); const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder'); +const { nextIntervalCalculator } = require('./lib/next_interval_calculator'); +const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack'); + class ETHWorker extends BaseWorker { constructor() { @@ -27,6 +27,7 @@ class ETHWorker extends BaseWorker { } this.feesDecoder = new FeesDecoder(this.web3, this.web3Wrapper); this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3, this.web3Wrapper); + this.buffer = []; } parseEthInternalTrx(result) { @@ -41,6 +42,7 @@ class ETHWorker extends BaseWorker { } fetchEthInternalTrx(fromBlock, toBlock) { + logger.info(`Fetching traces info ${fromBlock}:${toBlock}`); return this.ethClient.request('trace_filter', [{ fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock), toBlock: this.web3Wrapper.parseNumberToHex(toBlock) @@ -67,55 +69,59 @@ class ETHWorker extends BaseWorker { } async fetchReceipts(blockNumbers) { - const responses = []; - + const batch = []; for (const blockNumber of blockNumbers) { - const req = this.ethClient.request(constants.RECEIPTS_API_METHOD, [this.web3Wrapper.parseNumberToHex(blockNumber)], undefined, false); - responses.push(this.ethClient.request([req])); + batch.push( + this.ethClient.request( + constants.RECEIPTS_API_METHOD, + [this.web3Wrapper.parseNumberToHex(blockNumber)], + undefined, + false + ) + ); } - - const finishedRequests = await Promise.all(responses); + const finishedRequests = await this.ethClient.request(batch); const result = {}; - finishedRequests.forEach((blockResponses) => { - if (!blockResponses) return; - - blockResponses.forEach((blockResponse) => { - if (blockResponse.result) { - blockResponse.result.forEach((receipt) => { - result[receipt.transactionHash] = receipt; - }); - } - else { - throw new Error(JSON.stringify(blockResponse)); - } - }); + finishedRequests.forEach((response) => { + if (response.result) { + response.result.forEach((receipt) => { + result[receipt.transactionHash] = receipt; + }); + } + else { + throw new Error(JSON.stringify(response)); + } }); return result; } - async fetchTracesBlocksAndReceipts(fromBlock, toBlock) { - logger.info(`Fetching traces for blocks ${fromBlock}:${toBlock}`); - const [traces, blocks] = await Promise.all([ - this.fetchEthInternalTrx(fromBlock, toBlock), - this.fetchBlocks(fromBlock, toBlock) - ]); + async fetchBlocksAndReceipts(fromBlock, toBlock) { + logger.info(`Fetching blocks info ${fromBlock}:${toBlock}`); + const blocks = await this.fetchBlocks(fromBlock, toBlock); logger.info(`Fetching receipts of ${fromBlock}:${toBlock}`); const receipts = await this.fetchReceipts(blocks.keys()); - return [traces, blocks, receipts]; + return [blocks, receipts]; } - async getPastEvents(fromBlock, toBlock, traces, blocks, receipts) { + async fetchData(fromBlock, toBlock) { + return await Promise.all([ + this.fetchEthInternalTrx(fromBlock, toBlock), + this.fetchBlocksAndReceipts(fromBlock, toBlock)]); + } + + transformEvents(fromBlock, toBlock, data) { + const [traces, [blocks, receipts]] = data; let events = []; if (fromBlock === 0) { logger.info('Adding the GENESIS transfers'); events.push(...getGenesisTransfers(this.web3)); } - events.push(... await this.getPastTransferEvents(traces, blocks)); - events.push(... await this.getPastTransactionEvents(blocks.values(), receipts)); + events.push(...this.transformTransferEvents(traces, blocks)); + events.push(...this.transformTransactionEvents(blocks.values(), receipts)); if (fromBlock <= DAO_HACK_FORK_BLOCK && DAO_HACK_FORK_BLOCK <= toBlock) { logger.info('Adding the DAO hack transfers'); events = injectDAOHackTransfers(events); @@ -124,54 +130,44 @@ class ETHWorker extends BaseWorker { return events; } - async getPastTransferEvents(traces, blocksMap) { + transformTransferEvents(traces, blocksMap) { const result = []; - for (let i = 0; i < traces.length; i++) { const block_timestamp = this.web3Wrapper.decodeTimestampFromBlock(blocksMap.get(traces[i]['blockNumber'])); - result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper)); + result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));//TODO: Maybe push {blocknumbers: data} } return result; } - async getPastTransactionEvents(blocks, receipts) { + transformTransactionEvents(blocks, receipts) { const result = []; - for (const block of blocks) { const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, receipts); const blockNumber = this.web3Wrapper.parseHexToNumber(block.number); if (constants.IS_ETH && blockNumber >= constants.SHANGHAI_FORK_BLOCK) { - decoded_transactions.push(... await this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber)); + decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber)); } result.push(...decoded_transactions); + //TODO: Maybe push {blocknumbers: data} } return result; } +//TODO:s - If you have a [{#:data}] then you can check whether the # just doesnt have data or it's missing + async makeQueueTask(interval) { + const data = await this.fetchData(interval.fromBlock, interval.toBlock); + const transformedData = this.transformEvents(interval.fromBlock, interval.toBlock, data); + transformedData.forEach((data) => this.buffer.push(data)); + } async work() { - const result = await nextIntervalCalculator(this); - if (!result.success) { - return []; - } + const intervals = await nextIntervalCalculator(this); + if (intervals.length === 0) return []; - logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`); - const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(result.fromBlock, result.toBlock); - const events = await this.getPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts); - - if (events.length > 0) { - stableSort(events, transactionOrder); - for (let i = 0; i < events.length; i++) { - events[i].primaryKey = this.lastPrimaryKey + i + 1; - } + for (const interval of intervals) this.queue.add(() => this.makeQueueTask(interval)); - this.lastPrimaryKey += events.length; - } - - this.lastExportedBlock = result.toBlock; - - return events; + this.lastExportedBlock = Math.max(intervals[intervals.length - 1].toBlock, this.lastExportTime); } async init() { diff --git a/blockchains/eth/lib/constants.js b/blockchains/eth/lib/constants.js index b804df0d..5a4bc9a6 100644 --- a/blockchains/eth/lib/constants.js +++ b/blockchains/eth/lib/constants.js @@ -3,9 +3,10 @@ const ETH_WITHDRAWAL = 'withdrawal'; const LONDON_FORK_BLOCK = 12965000; const SHANGHAI_FORK_BLOCK = 17034871; const IS_ETH = parseInt(process.env.IS_ETH || '1'); +const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts'; const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3'); const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '100'); -const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts'; +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || 1); const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/'; const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30'); @@ -13,6 +14,7 @@ module.exports = { BLOCK_INTERVAL, CONFIRMATIONS, NODE_URL, + MAX_CONCURRENT_REQUESTS, LOOP_INTERVAL_CURRENT_MODE_SEC, BURN_ADDRESS, ETH_WITHDRAWAL, diff --git a/blockchains/eth/lib/genesis_transfers.js b/blockchains/eth/lib/genesis_transfers.js index 0255bccb..5d7bca1e 100644 --- a/blockchains/eth/lib/genesis_transfers.js +++ b/blockchains/eth/lib/genesis_transfers.js @@ -8,7 +8,7 @@ const GENESIS_TRANSFERS = fs.readFileSync(path.resolve(__dirname) + '/ethereum_g const GENESIS_TIMESTAMP = 1438269973; -exports.getGenesisTransfers = function(web3) { +exports.getGenesisTransfers = function (web3) { const result = []; GENESIS_TRANSFERS.forEach((transfer) => { const [id, from, to, amount] = transfer; diff --git a/blockchains/eth/lib/next_interval_calculator.js b/blockchains/eth/lib/next_interval_calculator.js index 9392739b..faa4fb20 100644 --- a/blockchains/eth/lib/next_interval_calculator.js +++ b/blockchains/eth/lib/next_interval_calculator.js @@ -1,56 +1,41 @@ const constants = require('./constants'); - -function isNewBlockAvailable(worker) { - return worker.lastExportedBlock < worker.lastConfirmedBlock; -} /** - * Return the next interval to be fetched. - * NOTE: this method modifies the member variables of its argument - * - * @param {*} worker A worker object, member variables would be modified - * @returns An object like so: - * { - * success: Boolean, - * fromBlock: Integer, - * toBlock: Integer - * } + * A function that returns the appropriate array of intervals, + * depending on the progress that the worker's made. + * If the exporter's caught up, we check for a new block. We then check whether the Node + * returns a valid block (sometimes the Node returns an early block, like 3 for example). + * We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration + * the function will return the appropriate array of intervals. + * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. + * @returns {Array} An array of intervals. */ async function nextIntervalCalculator(worker) { - // Check if we need to ask the Node for new Head block. This is an optimization to skip this call when the exporter - // is behind the last seen Head anyways. - const firstNewBlockCheck = isNewBlockAvailable(worker); - if (!firstNewBlockCheck) { - // On the previous cycle we closed the gap to the head of the blockchain. - // Check if there are new blocks now. + if (worker.lastExportedBlock >= worker.lastConfirmedBlock) { const newConfirmedBlock = await worker.web3.eth.getBlockNumber() - constants.CONFIRMATIONS; - if (newConfirmedBlock > worker.lastConfirmedBlock) { - // The Node has progressed + if (worker.lastConfirmedBlock < newConfirmedBlock) { worker.lastConfirmedBlock = newConfirmedBlock; } + worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; + return []; } - if (firstNewBlockCheck || isNewBlockAvailable(worker)) { - // If data was available without asking with Node, we are catching up and should come back straight away - if (firstNewBlockCheck) { - worker.sleepTimeMsec = 0; - } - else { - // If data became available only after asking the Node, we are close to the Head, come back later - worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; - } + worker.sleepTimeMsec = 0; + const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock; + const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL; + let intervalArrayLength; + if (progressDifference < maxInterval) { + intervalArrayLength = Math.ceil(progressDifference / constants.BLOCK_INTERVAL); + } else { + intervalArrayLength = constants.MAX_CONCURRENT_REQUESTS; + } + return Array.from({ length: intervalArrayLength }, (_, i) => { return { - success: true, - fromBlock: worker.lastExportedBlock + 1, - toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL, worker.lastConfirmedBlock) + fromBlock: worker.lastExportedBlock + constants.BLOCK_INTERVAL * i + 1, + toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL * (i + 1), worker.lastConfirmedBlock) }; - } - else { - // The Node has not progressed - worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; - return { success: false }; - } + }); } module.exports = { diff --git a/index.js b/index.js index 068b62ba..0452a60d 100644 --- a/index.js +++ b/index.js @@ -6,10 +6,13 @@ const metrics = require('./lib/metrics'); const { logger } = require('./lib/logger'); const { Exporter } = require('./lib/kafka_storage'); const EXPORTER_NAME = process.env.EXPORTER_NAME || pkg.name; -const { BLOCKCHAIN, EXPORT_TIMEOUT_MLS } = require('./lib/constants'); +const { stableSort, transactionOrder } = require('./blockchains/eth/lib/util'); +const { BLOCKCHAIN, EXPORT_TIMEOUT_MLS, MAX_CONCURRENT_REQUESTS, PQUEUE_MAX_SIZE } = require('./lib/constants'); + const worker = require(`./blockchains/${BLOCKCHAIN}/${BLOCKCHAIN}_worker`); var SegfaultHandler = require('segfault-handler'); +const { BLOCK_INTERVAL } = require('./blockchains/eth/lib/constants'); SegfaultHandler.registerHandler(`${EXPORTER_NAME}_crash.log`); class Main { @@ -30,6 +33,10 @@ class Main { async handleInitPosition() { const lastRecoveredPosition = await this.exporter.getLastPosition(); this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); + this.currentInterval = { + fromBlock: this.lastProcessedPosition.blockNumber + 1, + toBlock: this.lastProcessedPosition.blockNumber + BLOCK_INTERVAL + }; await this.exporter.savePosition(this.lastProcessedPosition); } @@ -43,6 +50,10 @@ class Main { this.worker = new worker.worker(); await this.worker.init(this.exporter, metrics); await this.handleInitPosition(); + if (BLOCKCHAIN === 'eth') { + const PQueue = (await import('p-queue')).default; + this.worker.queue = new PQueue({ concurrency: MAX_CONCURRENT_REQUESTS }); + } } async init() { @@ -71,22 +82,49 @@ class Main { metrics.lastExportedBlock.set(this.worker.lastExportedBlock); } - async workLoop() { - while (this.shouldWork) { - this.worker.lastRequestStartTime = new Date(); - const events = await this.worker.work(); - - this.worker.lastExportTime = Date.now(); + #inCurrentInterval(blockNumber) { + return blockNumber >= this.currentInterval.fromBlock && blockNumber <= this.currentInterval.toBlock; + } - this.updateMetrics(); - this.lastProcessedPosition = this.worker.getLastProcessedPosition(); + #inNextInterval(blockNumber) { + return ( + blockNumber >= this.currentInterval.fromBlock + BLOCK_INTERVAL && + blockNumber <= this.currentInterval.toBlock + BLOCK_INTERVAL); + } - if (events && events.length > 0) { - await this.exporter.storeEvents(events); + generateKafkaArray(events) { + stableSort(events, transactionOrder); + const kafkaArray = []; + while (events.length > 0) { + if (this.#inCurrentInterval(events[0].blockNumber)) { + events[0].primaryKey = this.lastPrimaryKey + kafkaArray.length + 1; + kafkaArray.push(events.shift()); + } else if (this.#inNextInterval(events[0].blockNumber)) { + events[0].primaryKey = this.lastPrimaryKey + kafkaArray.length + 1; + kafkaArray.push(events.shift()); + this.currentInterval.fromBlock += BLOCK_INTERVAL; + this.currentInterval.toBlock += BLOCK_INTERVAL; + } else { + break; } - await this.exporter.savePosition(this.lastProcessedPosition); - logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); + } + this.lastPrimaryKey += kafkaArray.length; + return kafkaArray; + } + async workLoop() { + while (this.shouldWork) { + if (this.worker.queue.size < PQUEUE_MAX_SIZE) await this.worker.work(); + const kafkaArray = this.generateKafkaArray(this.worker.buffer, this.worker.lastBufferedBlock); + if (kafkaArray.length > 0) { + this.lastProcessedPosition = { + primaryKey: kafkaArray[kafkaArray.length - 1].primaryKey, + blockNumber: kafkaArray[kafkaArray.length - 1].blockNumber + }; + await this.exporter.storeEvents(kafkaArray); + await this.exporter.savePosition(this.lastProcessedPosition); + logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); + } if (this.shouldWork) { await new Promise((resolve) => setTimeout(resolve, this.worker.sleepTimeMsec)); } diff --git a/lib/constants.js b/lib/constants.js index 8dd59f2f..8b08d40d 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -1,13 +1,17 @@ const BLOCKCHAIN = process.env.BLOCKCHAIN; const CONFIG_PATH = process.env.CONFIG_PATH; const EXPORT_BLOCKS_LIST = process.env.EXPORT_BLOCKS_LIST || false; +const PQUEUE_MAX_SIZE = parseInt(process.env.PQUEUE_MAX_SIZE || 100); +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || 1); const EXPORT_TIMEOUT_MLS = parseInt(process.env.EXPORT_TIMEOUT_MLS) || 1000 * 60 * 5; // 5 minutes const EXPORT_BLOCKS_LIST_MAX_INTERVAL = parseInt(process.env.EXPORT_BLOCKS_LIST_MAX_INTERVAL) || 50; module.exports = { BLOCKCHAIN, CONFIG_PATH, + PQUEUE_MAX_SIZE, EXPORT_BLOCKS_LIST, EXPORT_TIMEOUT_MLS, + MAX_CONCURRENT_REQUESTS, EXPORT_BLOCKS_LIST_MAX_INTERVAL }; diff --git a/package-lock.json b/package-lock.json index a167f498..aad668b6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "node-rdkafka": "2.17.0", "node-zookeeper-client": "^1.1.3", "p-queue": "^7.2.0", + "p-retry": "^6.1.0", "prom-client": "^13.1.0", "segfault-handler": "^1.3.0", "web3": "^1.5.3", @@ -860,6 +861,11 @@ "@types/node": "*" } }, + "node_modules/@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "node_modules/@types/secp256k1": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/@types/secp256k1/-/secp256k1-4.0.6.tgz", @@ -3497,6 +3503,17 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/is-network-error": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/is-network-error/-/is-network-error-1.0.0.tgz", + "integrity": "sha512-P3fxi10Aji2FZmHTrMPSNFbNC6nnp4U5juPAIjXPHkUNubi4+qK7vvdsaNpAUwXslhYm9oyjEYTxs1xd/+Ph0w==", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-number": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", @@ -4603,6 +4620,22 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-retry": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-6.1.0.tgz", + "integrity": "sha512-fJLEQ2KqYBJRuaA/8cKMnqhulqNM+bpcjYtXNex2t3mOXKRYPitAJt9NacSf8XAFzcYahSAbKpobiWDSqHSh2g==", + "dependencies": { + "@types/retry": "0.12.2", + "is-network-error": "^1.0.0", + "retry": "^0.13.1" + }, + "engines": { + "node": ">=16.17" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -4984,6 +5017,14 @@ "node": ">=8" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/reusify": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", diff --git a/package.json b/package.json index 9d303ead..e3d57acf 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "node-rdkafka": "2.17.0", "node-zookeeper-client": "^1.1.3", "p-queue": "^7.2.0", + "p-retry": "^6.1.0", "prom-client": "^13.1.0", "segfault-handler": "^1.3.0", "web3": "^1.5.3", diff --git a/test/eth/fetch_events.spec.js b/test/eth/fetch_events.spec.js index 50a59cbc..81c1e356 100644 --- a/test/eth/fetch_events.spec.js +++ b/test/eth/fetch_events.spec.js @@ -6,7 +6,7 @@ const { DAO_HACK_FORK_BLOCK } = require('../../blockchains/eth/lib/dao_hack'); -describe('fetch past events', function() { +describe('transform events', function() { const transaction = { blockHash: '0x22854625d4c18b3034461851a6fb181209e77a242adbd923989e7113a60fec56', blockNumber: '0x572559', @@ -58,7 +58,7 @@ describe('fetch past events', function() { transactionsRoot: '0xa7df4bb8858bfc779dae9b59201561394b686cdc942a7b0728aa396f7e35f40f', uncles: [] }); - + const receipts = { '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d' : { @@ -118,12 +118,12 @@ describe('fetch past events', function() { 'transactionPosition': 0, 'type': 'call' }; - + const worker = new eth_worker.worker(); let feeResult = null; let callResult = null; - - beforeEach(async function() { + + beforeEach(function() { feeResult = { from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', to: '0x829bd824b016326a401d083b33d092293333a830', @@ -134,7 +134,7 @@ describe('fetch past events', function() { transactionHash: '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', type: 'fee' }; - + callResult = { from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', to: '0xb1690c08e213a35ed9bab7b318de14420fb57d8c', @@ -147,25 +147,25 @@ describe('fetch past events', function() { type: 'call' }; }); - - - it('parse transaction events', async function () { - const result = await worker.getPastTransactionEvents(blocks.values(), receipts); + + + it('parse transaction events', function () { + const result = worker.transformTransactionEvents(blocks.values(), receipts); const expectedResult = [feeResult]; - + assert.deepStrictEqual(expectedResult, result); }); - - it('parse transfer events', async function () { - const result = await worker.getPastTransferEvents([trace], blocks); + + it('parse transfer events', function () { + const result = worker.transformTransferEvents([trace], blocks); const expectedResult = [callResult]; - + assert.deepStrictEqual(expectedResult, result); }); - - it('add genesis events', async function () { - const result = await worker.getPastEvents(0, 1, [trace], blocks, receipts); - + + it('add genesis events', function () { + const result = worker.transformEvents(0, 1, [trace], blocks, receipts); + const firstGenesisEvent = { from: 'GENESIS', to: '0x000d836201318ec6899a67540690382780743280', @@ -176,35 +176,35 @@ describe('fetch past events', function() { transactionHash: 'GENESIS_000d836201318ec6899a67540690382780743280', type: 'genesis' }; - - + + assert.deepStrictEqual(firstGenesisEvent, result[0]); }); - - it('genesis events ordering', async function () { - const result = await worker.getPastEvents(0, 1, [trace], blocks, receipts); - + + it('genesis events ordering', function () { + const result = worker.transformEvents(0, 1, [trace], blocks, receipts); + const genesisEventsInserted = 8894; assert.strictEqual(result.length, genesisEventsInserted + 2); assert.deepStrictEqual(callResult, result[genesisEventsInserted]); assert.deepStrictEqual(feeResult, result[genesisEventsInserted + 1]); }); - - it('DAO hack events', async function () { - const result = await worker.getPastEvents(DAO_HACK_FORK_BLOCK - 1, DAO_HACK_FORK_BLOCK + 1, [trace], blocks, receipts); + + it('DAO hack events', function () { + const result = worker.transformEvents(DAO_HACK_FORK_BLOCK - 1, DAO_HACK_FORK_BLOCK + 1, [trace], blocks, receipts); const expectedEvents = DAO_HACK_ADDRESSES.length + 2; - + assert.deepStrictEqual(expectedEvents, result.length); }); - - it('DAO hack events ordering', async function () { + + it('DAO hack events ordering', function () { // Test that DAO hack events are inserted in between the others feeResult.blockNumber = DAO_HACK_FORK_BLOCK - 1; callResult.blockNumber = DAO_HACK_FORK_BLOCK - 1; - + const eventsResult = [feeResult, callResult]; injectDAOHackTransfers(eventsResult); - + assert.deepStrictEqual(feeResult, eventsResult[0]); assert.deepStrictEqual(callResult, eventsResult[1]); }); diff --git a/test/eth/worker.spec.js b/test/eth/worker.spec.js index 3c80bbf0..8e6807d9 100644 --- a/test/eth/worker.spec.js +++ b/test/eth/worker.spec.js @@ -5,67 +5,68 @@ const v8 = require('v8'); const testNullAction = require('./test_action_null.json'); describe('Test worker', function () { - const worker = new eth_worker.worker(); - let feeResult = null; - let callResult = null; - let feeResultWithPrimaryKey = null; - let callResultWithPrimaryKey = null; - - beforeEach(function () { - feeResult = { - from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', - to: '0x829bd824b016326a401d083b33d092293333a830', - value: 14086000000000000, - valueExactBase36: '3up2j2e99ts', - blockNumber: 5711193, - timestamp: 1527814787, - transactionHash: '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', - type: 'fee' - }; + const worker = new eth_worker.worker(); + let feeResult = null; + let callResult = null; + let feeResultWithPrimaryKey = null; + let callResultWithPrimaryKey = null; - callResult = { - from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', - to: '0xb1690c08e213a35ed9bab7b318de14420fb57d8c', - value: 320086793278069500, - valueExactBase36: '2fjpaqu9o0tc', - blockNumber: 5711193, - timestamp: 1527814787, - transactionHash: '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', - transactionPosition: 0, - type: 'call' - }; + beforeEach(function () { + feeResult = { + from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', + to: '0x829bd824b016326a401d083b33d092293333a830', + value: 14086000000000000, + valueExactBase36: '3up2j2e99ts', + blockNumber: 5711193, + timestamp: 1527814787, + transactionHash: '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', + type: 'fee' + }; - feeResultWithPrimaryKey = v8.deserialize(v8.serialize(feeResult)); - feeResultWithPrimaryKey.primaryKey = 1; + callResult = { + from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', + to: '0xb1690c08e213a35ed9bab7b318de14420fb57d8c', + value: 320086793278069500, + valueExactBase36: '2fjpaqu9o0tc', + blockNumber: 5711193, + timestamp: 1527814787, + transactionHash: '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', + transactionPosition: 0, + type: 'call' + }; - callResultWithPrimaryKey = v8.deserialize(v8.serialize(callResult)); - callResultWithPrimaryKey.primaryKey = 2; - }); + feeResultWithPrimaryKey = v8.deserialize(v8.serialize(feeResult)); + feeResultWithPrimaryKey.primaryKey = 1; + callResultWithPrimaryKey = v8.deserialize(v8.serialize(callResult)); + callResultWithPrimaryKey.primaryKey = 2; + }); - it('test primary key assignment', async function () { - // Overwrite variables and methods that the 'work' method would use internally. - worker.lastConfirmedBlock = 1; - worker.lastExportedBlock = 0; - worker.fetchTracesBlocksAndReceipts = async function () { - return []; - }; - worker.getPastEvents = async function () { - return [feeResult, callResult]; - }; + it('test primary key assignment', async function () { + // Overwrite variables and methods that the 'work' method would use internally. + worker.lastConfirmedBlock = 1; + worker.lastExportedBlock = 0; + worker.fetchEthInternalTrx = async function () { + return []; + }; + worker.fetchBlocksAndReceipts = async function () { + return []; + }; + worker.transformEvents = function () { + return [feeResult, callResult]; + }; - const result = await worker.work(); - - assert.deepStrictEqual(result, [feeResultWithPrimaryKey, callResultWithPrimaryKey]); - }); + const result = await worker.work(); + assert.deepStrictEqual(result, [feeResultWithPrimaryKey, callResultWithPrimaryKey]); + }); }); describe('Test that when action is null parsing would not break', function () { - it('Null action should not break parsing', function () { - const worker = new eth_worker.worker(); - const result = worker.parseEthInternalTrx(testNullAction); + it('Null action should not break parsing', function () { + const worker = new eth_worker.worker(); + const result = worker.parseEthInternalTrx(testNullAction); - assert.deepStrictEqual(result, []); - }); + assert.deepStrictEqual(result, []); + }); }); From 36095f5769fc86aa03042fd2e15938e04e589b1f Mon Sep 17 00:00:00 2001 From: Lyudmil Danailov Date: Thu, 28 Dec 2023 14:38:00 +0200 Subject: [PATCH 2/2] bkp --- blockchains/eth/eth_worker.js | 59 +++++++++++++++++++++++++-------- blockchains/eth/lib/dao_hack.js | 5 +++ index.js | 12 ++++--- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index 66afadc8..db35d3f7 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -10,7 +10,7 @@ const { decodeTransferTrace } = require('./lib/decode_transfers'); const { getGenesisTransfers } = require('./lib/genesis_transfers'); const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder'); const { nextIntervalCalculator } = require('./lib/next_interval_calculator'); -const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack'); +const { DAO_HACK_FORK_BLOCK, mapAddressToTransfer } = require('./lib/dao_hack'); class ETHWorker extends BaseWorker { @@ -112,49 +112,82 @@ class ETHWorker extends BaseWorker { this.fetchBlocksAndReceipts(fromBlock, toBlock)]); } + /** + * Method for transforming the fetched raw data from the Node. + * The reason behind making such an object is that it isn't necessarily known, + * whether there's data for the block number or some thing has gone wrong. The object would help + * track progress better. + * @param {number} fromBlock + * @param {number} toBlock + * @param {Array} data Array of 1. Array of Traces JSON values 2. Array of Blocks Map and Receipts Array of JSON + * @returns {object} {[blockNumber]: data} pairs + */ transformEvents(fromBlock, toBlock, data) { const [traces, [blocks, receipts]] = data; - let events = []; + let events = {}; + for (let i = fromBlock; i <= toBlock; i++) { + events[i] = []; + } if (fromBlock === 0) { logger.info('Adding the GENESIS transfers'); - events.push(...getGenesisTransfers(this.web3)); + events[0].push(...getGenesisTransfers(this.web3)); } - events.push(...this.transformTransferEvents(traces, blocks)); - events.push(...this.transformTransactionEvents(blocks.values(), receipts)); + const transferEvents = this.transformTransferEvents(traces, blocks); + const transactionEvents = this.transformTransactionEvents(blocks.values(), receipts); + for (let blockNumber in events) { + events[blockNumber].push(...transferEvents[blockNumber]); + events[blockNumber].push(...transactionEvents[blockNumber]); + } if (fromBlock <= DAO_HACK_FORK_BLOCK && DAO_HACK_FORK_BLOCK <= toBlock) { logger.info('Adding the DAO hack transfers'); - events = injectDAOHackTransfers(events); + events[DAO_HACK_FORK_BLOCK] = mapAddressToTransfer(); } return events; } + /** + * + * @param {Array} traces Array of JSON values for the traces + * @param {Map} blocksMap Map of { [blockNumber]: data } values for blocks info + * @returns {object} {[blockNumber]: data} pairs for transfer events' info + */ transformTransferEvents(traces, blocksMap) { - const result = []; + const result = {}; for (let i = 0; i < traces.length; i++) { - const block_timestamp = this.web3Wrapper.decodeTimestampFromBlock(blocksMap.get(traces[i]['blockNumber'])); - result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));//TODO: Maybe push {blocknumbers: data} + const block_number = this.web3Wrapper.parseHexToNumber(traces[i]['blockNumber']); + const block_timestamp = this.web3Wrapper.decodeTimestampFromBlock(blocksMap.get(block_number)); + if (!(block_number in result)) { + result[block_number] = []; + } + result[block_number].push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper)); } return result; } + /** + * + * @param {Array} blocks Block numbers array + * @param {Array} receipts Receipts JSON values array + * @returns {object} {[blockNumber]: data} pairs for transactions events' info + */ transformTransactionEvents(blocks, receipts) { - const result = []; + const result = {}; for (const block of blocks) { + result[block] = []; const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, receipts); const blockNumber = this.web3Wrapper.parseHexToNumber(block.number); if (constants.IS_ETH && blockNumber >= constants.SHANGHAI_FORK_BLOCK) { decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber)); } - result.push(...decoded_transactions); - //TODO: Maybe push {blocknumbers: data} + result[blockNumber].push(...decoded_transactions); } return result; } -//TODO:s - If you have a [{#:data}] then you can check whether the # just doesnt have data or it's missing + async makeQueueTask(interval) { const data = await this.fetchData(interval.fromBlock, interval.toBlock); const transformedData = this.transformEvents(interval.fromBlock, interval.toBlock, data); diff --git a/blockchains/eth/lib/dao_hack.js b/blockchains/eth/lib/dao_hack.js index 3b9d99a1..0f1d53ca 100644 --- a/blockchains/eth/lib/dao_hack.js +++ b/blockchains/eth/lib/dao_hack.js @@ -291,6 +291,10 @@ function daoAddressToTransfer(daoHackAddress) { }; } +function mapAddressToTransfer() { + return DAO_HACK_ADDRESSES.map(daoAddressToTransfer); +} + function injectDAOHackTransfers(transfers) { const insertIndex = transfers.findIndex((transfer) => transfer.blockNumber === DAO_HACK_FORK_BLOCK); const transfersToInsert = DAO_HACK_ADDRESSES.map(daoAddressToTransfer); @@ -304,5 +308,6 @@ function injectDAOHackTransfers(transfers) { module.exports = { DAO_HACK_FORK_BLOCK, DAO_HACK_ADDRESSES, + mapAddressToTransfer, injectDAOHackTransfers }; diff --git a/index.js b/index.js index 0452a60d..f3c0109b 100644 --- a/index.js +++ b/index.js @@ -34,12 +34,14 @@ class Main { const lastRecoveredPosition = await this.exporter.getLastPosition(); this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); this.currentInterval = { - fromBlock: this.lastProcessedPosition.blockNumber + 1, - toBlock: this.lastProcessedPosition.blockNumber + BLOCK_INTERVAL + fromBlock: this.lastProcessedPosition.blockNumber - BLOCK_INTERVAL + 1, + toBlock: this.lastProcessedPosition.blockNumber }; await this.exporter.savePosition(this.lastProcessedPosition); } - +// Start from 0 -> currInterval = <-99 0> +//[ ] +//[ ] -> check should fail #isWorkerSet() { if (this.worker) throw new Error('Worker is already set'); } @@ -95,11 +97,13 @@ class Main { generateKafkaArray(events) { stableSort(events, transactionOrder); const kafkaArray = []; + let initInterval = false; while (events.length > 0) { if (this.#inCurrentInterval(events[0].blockNumber)) { events[0].primaryKey = this.lastPrimaryKey + kafkaArray.length + 1; kafkaArray.push(events.shift()); - } else if (this.#inNextInterval(events[0].blockNumber)) { + if (!initInterval) initInterval = true; + } else if (this.#inNextInterval(events[0].blockNumber) && initInterval) { events[0].primaryKey = this.lastPrimaryKey + kafkaArray.length + 1; kafkaArray.push(events.shift()); this.currentInterval.fromBlock += BLOCK_INTERVAL;