Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding parallelism to the ETH, ERC20, MATIC pipelines #162

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
49 changes: 35 additions & 14 deletions blockchains/erc20/erc20_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,66 @@ class ERC20Worker extends BaseWorker {
}

async work() {
const result = constants.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
await nextIntervalCalculator(this);
const requestIntervals = await nextIntervalCalculator(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you removing the 'export block list' feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've completely missed that. But for the time being, I'd remove this part of the code and rework it later on. It needs to be reworked to fit in with this functionality. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's integrate it again in a next PR not to slow down this one. It would be very similar to what you do in nextIntervalCalculator currently - generate a sequence of intervals with the parallelism in mind.

if (requestIntervals.length === 0) return [];

if (!result.success) {
return [];
}

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
logger.info(
`Fetching transfer events for interval ${requestIntervals[0].fromBlock}:` +
`${requestIntervals[requestIntervals.length - 1].toBlock}`);

let events = [];
let overwritten_events = [];
const timestampsCache = new TimestampsCache();
if ('extract_exact_overwrite' === constants.CONTRACT_MODE) {
if (this.allOldContracts.length > 0) {
events = await getPastEvents(this.web3, result.fromBlock, result.toBlock, this.allOldContracts, timestampsCache);
events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
this.allOldContracts,
timestampsCache);
})));
changeContractAddresses(events, this.contractsOverwriteArray);
}

if (this.contractsUnmodified.length > 0) {
const rawEvents = await getPastEvents(this.web3, result.fromBlock, result.toBlock, this.contractsUnmodified,
timestampsCache);
const rawEvents = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
this.contractsUnmodified,
timestampsCache);
})));
events.push(...rawEvents);
}
}
else {
events = await getPastEvents(this.web3, result.fromBlock, result.toBlock, null, timestampsCache);
events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
null,
timestampsCache);
})));
if ('extract_all_append' === constants.CONTRACT_MODE) {
overwritten_events = extractChangedContractAddresses(events, this.contractsOverwriteArray);
}
}

if (events.length > 0) {
extendEventsWithPrimaryKey(events, overwritten_events);
logger.info(`Setting primary keys ${events.length} messages for blocks ${result.fromBlock}:${result.toBlock}`);
logger.info(`Setting primary keys ${events.length} messages for blocks ${requestIntervals[0].fromBlock}:`+
`${requestIntervals[requestIntervals.length - 1].toBlock}`);
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}

this.lastExportedBlock = result.toBlock;
this.lastExportedBlock = requestIntervals[requestIntervals.length - 1].toBlock;
const resultEvents = events.concat(overwritten_events);

// If overwritten events have been generated, they need to be merged into the original events
Expand Down
120 changes: 79 additions & 41 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
const Web3 = require('web3');
const jayson = require('jayson/promise');
const { filterErrors } = require('./lib/filter_errors');
const constants = require('./lib/constants');
const { logger } = require('../../lib/logger');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { filterErrors } = require('./lib/filter_errors');
const { transactionOrder, stableSort } = require('./lib/util');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');


class ETHWorker extends BaseWorker {
constructor() {
Expand All @@ -29,6 +30,32 @@ class ETHWorker extends BaseWorker {
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3, this.web3Wrapper);
}

async ethClientRequestWithRetry(...params) {
let retries = 0;
let retryIntervalMs = 0;
while (retries < constants.MAX_RETRIES) {
try {
const response = await this.ethClient.request(...params);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should (...params) be spread again, I thought that request would work with an array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would work with an array if it works with a batch of requests. Technically, [] is a batch, but I'd have to rework how the parameters are given. Currently we have

('trace_filter', [{
  fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
  toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}])

if (response.error || response.result === null) {
retries++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we even wait if the retries number is reached at this point, think MAX_RETRIES=0

Copy link
Contributor

@WonderBeat WonderBeat Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely wait. I suggest to use a library for retryer. it's error prone to write it yourself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect MAX_RETRIES to specify the number of retries, that is attempts to re-send after the initial sending. Instead if it is is 0, we would not send the request at all. If it is 1, we would send one time, on error wait, but then not retry. Seems to me like the logic is a bit off.

retryIntervalMs += (constants.BACKOFF_RETRY_STEP * retries);
logger.error(`${params[0]} failed. Reason: ${response.error}. Retrying for ${retries} time`);
YordanPavlov marked this conversation as resolved.
Show resolved Hide resolved
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
} else {
return response;
}
} catch(err) {
retries++;
retryIntervalMs += (constants.BACKOFF_RETRY_STEP * retries);
logger.error(
`Try block in ${params[0]} failed. Reason: ${err.toString()}. Waiting ${retryIntervalMs} and retrying for ${retries} time`
);
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
}
}
return Promise.reject(`${params[0]} failed after ${retries} retries`);
}

parseEthInternalTrx(result) {
const traces = filterErrors(result);

Expand All @@ -41,13 +68,15 @@ class ETHWorker extends BaseWorker {
}

fetchEthInternalTrx(fromBlock, toBlock) {
return this.ethClient.request('trace_filter', [{
logger.info(`Fetching internal transactions for blocks ${fromBlock}:${toBlock}`);
return this.ethClientRequestWithRetry('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}]).then((data) => this.parseEthInternalTrx(data['result']));
}

async fetchBlocks(fromBlock, toBlock) {
logger.info(`Fetching block info for blocks ${fromBlock}:${toBlock}`);
const blockRequests = [];
for (let i = fromBlock; i <= toBlock; i++) {
blockRequests.push(
Expand All @@ -67,36 +96,35 @@ class ETHWorker extends BaseWorker {
}

async fetchReceipts(blockNumbers) {
const responses = [];

const batch = [];
for (const blockNumber of blockNumbers) {
const req = this.ethClient.request(constants.RECEIPTS_API_METHOD, [this.web3Wrapper.parseNumberToHex(blockNumber)], undefined, false);
responses.push(this.ethClient.request([req]));
batch.push(
this.ethClient.request(
constants.RECEIPTS_API_METHOD,
[this.web3Wrapper.parseNumberToHex(blockNumber)],
undefined,
false
)
);
}

const finishedRequests = await Promise.all(responses);
const finishedRequests = await this.ethClientRequestWithRetry(batch);
const result = {};

finishedRequests.forEach((blockResponses) => {
if (!blockResponses) return;

blockResponses.forEach((blockResponse) => {
if (blockResponse.result) {
blockResponse.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(blockResponse));
}
});
finishedRequests.forEach((response) => {
if (response.result) {
response.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(response));
}
});

return result;
}

async fetchTracesBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching traces for blocks ${fromBlock}:${toBlock}`);
const [traces, blocks] = await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock)
Expand All @@ -108,14 +136,19 @@ class ETHWorker extends BaseWorker {
}

async getPastEvents(fromBlock, toBlock, traces, blocks, receipts) {
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
let events = [];
if (fromBlock === 0) {
logger.info('Adding the GENESIS transfers');
events.push(...getGenesisTransfers(this.web3));
}

events.push(... await this.getPastTransferEvents(traces, blocks));
events.push(... await this.getPastTransactionEvents(blocks.values(), receipts));
const transferEvents = await this.getPastTransferEvents(traces, blocks);
for (const transfer of transferEvents) events.push(transfer);

const transactionEvents = await this.getPastTransactionEvents(blocks.values(), receipts);
for (const trx of transactionEvents) events.push(trx);

if (fromBlock <= DAO_HACK_FORK_BLOCK && DAO_HACK_FORK_BLOCK <= toBlock) {
logger.info('Adding the DAO hack transfers');
events = injectDAOHackTransfers(events);
Expand All @@ -142,23 +175,29 @@ class ETHWorker extends BaseWorker {
const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, receipts);
const blockNumber = this.web3Wrapper.parseHexToNumber(block.number);
if (constants.IS_ETH && blockNumber >= constants.SHANGHAI_FORK_BLOCK) {
decoded_transactions.push(... await this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber));
const temp_decoded_transactions = await this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber);
for (const trx of temp_decoded_transactions) {
decoded_transactions.push(trx);
}
}
for (const trx of decoded_transactions) {
result.push(trx);
}
result.push(...decoded_transactions);
}

return result;
}

async work() {
const result = await nextIntervalCalculator(this);
if (!result.success) {
return [];
}
const requestIntervals = await nextIntervalCalculator(this);
if (requestIntervals.length === 0) return [];

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(result.fromBlock, result.toBlock);
const events = await this.getPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);
const events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(requestInterval.fromBlock, requestInterval.toBlock);
return await this.getPastEvents(requestInterval.fromBlock, requestInterval.toBlock, traces, blocks, receipts);
})
));

if (events.length > 0) {
stableSort(events, transactionOrder);
Expand All @@ -169,8 +208,7 @@ class ETHWorker extends BaseWorker {
this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = result.toBlock;

this.lastExportedBlock = requestIntervals[requestIntervals.length - 1].toBlock;
return events;
}

Expand Down
26 changes: 16 additions & 10 deletions blockchains/eth/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ const ETH_WITHDRAWAL = 'withdrawal';
const LONDON_FORK_BLOCK = 12965000;
const SHANGHAI_FORK_BLOCK = 17034871;
const IS_ETH = parseInt(process.env.IS_ETH || '1');
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES) || 3;
const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '100');
const BACKOFF_RETRY_STEP = parseInt(process.env.BACKOFF_RETRY_STEP || '2000');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || '1');
const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/';
const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30');

module.exports = {
BLOCK_INTERVAL,
CONFIRMATIONS,
NODE_URL,
LOOP_INTERVAL_CURRENT_MODE_SEC,
BURN_ADDRESS,
ETH_WITHDRAWAL,
IS_ETH,
LONDON_FORK_BLOCK,
SHANGHAI_FORK_BLOCK,
RECEIPTS_API_METHOD
IS_ETH,
NODE_URL,
MAX_RETRIES,
BURN_ADDRESS,
CONFIRMATIONS,
BLOCK_INTERVAL,
ETH_WITHDRAWAL,
LONDON_FORK_BLOCK,
BACKOFF_RETRY_STEP,
SHANGHAI_FORK_BLOCK,
RECEIPTS_API_METHOD,
MAX_CONCURRENT_REQUESTS,
LOOP_INTERVAL_CURRENT_MODE_SEC
};
Loading