Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving on the parallelism idea #167

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/export_env_vars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export KAFKA_TOPIC="erc20_exporter_test_topic"
# 'kubectl -n cardano port-forward cardano-graphql-pod-id --address 172.17.0.1 3100:3100'
# replacing with the actual pod id. The IP on which the container can access the host is
# usually 172.17.0.1
export CARDANO_GRAPHQL_URL=http://172.17.0.1:3100/graphql
export CARDANO_GRAPHQL_URL=https://cardano.santiment.net
export BNB_CHAIN_START_MSEC=1595549200002
export ZOOKEEPER_SESSION_TIMEOUT=20000
export CONTRACT_MAPPING_FILE_PATH="./test/erc20/contract_mapping/contract_mapping.json"
118 changes: 57 additions & 61 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
const Web3 = require('web3');
const jayson = require('jayson/promise');
const { filterErrors } = require('./lib/filter_errors');
const constants = require('./lib/constants');
const { logger } = require('../../lib/logger');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { filterErrors } = require('./lib/filter_errors');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');


class ETHWorker extends BaseWorker {
constructor() {
Expand All @@ -27,6 +27,7 @@ class ETHWorker extends BaseWorker {
}
this.feesDecoder = new FeesDecoder(this.web3, this.web3Wrapper);
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3, this.web3Wrapper);
this.buffer = [];
}

parseEthInternalTrx(result) {
Expand All @@ -41,6 +42,7 @@ class ETHWorker extends BaseWorker {
}

fetchEthInternalTrx(fromBlock, toBlock) {
logger.info(`Fetching traces info ${fromBlock}:${toBlock}`);
return this.ethClient.request('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
Expand All @@ -67,55 +69,59 @@ class ETHWorker extends BaseWorker {
}

async fetchReceipts(blockNumbers) {
const responses = [];

const batch = [];
for (const blockNumber of blockNumbers) {
const req = this.ethClient.request(constants.RECEIPTS_API_METHOD, [this.web3Wrapper.parseNumberToHex(blockNumber)], undefined, false);
responses.push(this.ethClient.request([req]));
batch.push(
this.ethClient.request(
constants.RECEIPTS_API_METHOD,
[this.web3Wrapper.parseNumberToHex(blockNumber)],
undefined,
false
)
);
}

const finishedRequests = await Promise.all(responses);
const finishedRequests = await this.ethClient.request(batch);
const result = {};

finishedRequests.forEach((blockResponses) => {
if (!blockResponses) return;

blockResponses.forEach((blockResponse) => {
if (blockResponse.result) {
blockResponse.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(blockResponse));
}
});
finishedRequests.forEach((response) => {
if (response.result) {
response.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(response));
}
});

return result;
}

async fetchTracesBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching traces for blocks ${fromBlock}:${toBlock}`);
const [traces, blocks] = await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock)
]);
async fetchBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching blocks info ${fromBlock}:${toBlock}`);
const blocks = await this.fetchBlocks(fromBlock, toBlock);
logger.info(`Fetching receipts of ${fromBlock}:${toBlock}`);
const receipts = await this.fetchReceipts(blocks.keys());

return [traces, blocks, receipts];
return [blocks, receipts];
}

async getPastEvents(fromBlock, toBlock, traces, blocks, receipts) {
async fetchData(fromBlock, toBlock) {
return await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocksAndReceipts(fromBlock, toBlock)]);
}

transformEvents(fromBlock, toBlock, data) {
const [traces, [blocks, receipts]] = data;
let events = [];
if (fromBlock === 0) {
logger.info('Adding the GENESIS transfers');
events.push(...getGenesisTransfers(this.web3));
}

events.push(... await this.getPastTransferEvents(traces, blocks));
events.push(... await this.getPastTransactionEvents(blocks.values(), receipts));
events.push(...this.transformTransferEvents(traces, blocks));
events.push(...this.transformTransactionEvents(blocks.values(), receipts));
if (fromBlock <= DAO_HACK_FORK_BLOCK && DAO_HACK_FORK_BLOCK <= toBlock) {
logger.info('Adding the DAO hack transfers');
events = injectDAOHackTransfers(events);
Expand All @@ -124,54 +130,44 @@ class ETHWorker extends BaseWorker {
return events;
}

async getPastTransferEvents(traces, blocksMap) {
transformTransferEvents(traces, blocksMap) {
const result = [];

for (let i = 0; i < traces.length; i++) {
const block_timestamp = this.web3Wrapper.decodeTimestampFromBlock(blocksMap.get(traces[i]['blockNumber']));
result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));
result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));//TODO: Maybe push {blocknumbers: data}
}

return result;
}

async getPastTransactionEvents(blocks, receipts) {
transformTransactionEvents(blocks, receipts) {
const result = [];

for (const block of blocks) {
const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, receipts);
const blockNumber = this.web3Wrapper.parseHexToNumber(block.number);
if (constants.IS_ETH && blockNumber >= constants.SHANGHAI_FORK_BLOCK) {
decoded_transactions.push(... await this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber));
decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber));
}
result.push(...decoded_transactions);
//TODO: Maybe push {blocknumbers: data}
}

return result;
}
//TODO:s - If you have a [{#:data}] then you can check whether the # just doesnt have data or it's missing
async makeQueueTask(interval) {
const data = await this.fetchData(interval.fromBlock, interval.toBlock);
const transformedData = this.transformEvents(interval.fromBlock, interval.toBlock, data);
transformedData.forEach((data) => this.buffer.push(data));
}

async work() {
const result = await nextIntervalCalculator(this);
if (!result.success) {
return [];
}
const intervals = await nextIntervalCalculator(this);
if (intervals.length === 0) return [];

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(result.fromBlock, result.toBlock);
const events = await this.getPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}
for (const interval of intervals) this.queue.add(() => this.makeQueueTask(interval));

this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = result.toBlock;

return events;
this.lastExportedBlock = Math.max(intervals[intervals.length - 1].toBlock, this.lastExportTime);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense ATM, should be removed/refactored

}

async init() {
Expand Down
4 changes: 3 additions & 1 deletion blockchains/eth/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ const ETH_WITHDRAWAL = 'withdrawal';
const LONDON_FORK_BLOCK = 12965000;
const SHANGHAI_FORK_BLOCK = 17034871;
const IS_ETH = parseInt(process.env.IS_ETH || '1');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '100');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || 1);
const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/';
const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30');

module.exports = {
BLOCK_INTERVAL,
CONFIRMATIONS,
NODE_URL,
MAX_CONCURRENT_REQUESTS,
LOOP_INTERVAL_CURRENT_MODE_SEC,
BURN_ADDRESS,
ETH_WITHDRAWAL,
Expand Down
2 changes: 1 addition & 1 deletion blockchains/eth/lib/genesis_transfers.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const GENESIS_TRANSFERS = fs.readFileSync(path.resolve(__dirname) + '/ethereum_g

const GENESIS_TIMESTAMP = 1438269973;

exports.getGenesisTransfers = function(web3) {
exports.getGenesisTransfers = function (web3) {
const result = [];
GENESIS_TRANSFERS.forEach((transfer) => {
const [id, from, to, amount] = transfer;
Expand Down
65 changes: 25 additions & 40 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,41 @@
const constants = require('./constants');


function isNewBlockAvailable(worker) {
return worker.lastExportedBlock < worker.lastConfirmedBlock;
}
/**
* Return the next interval to be fetched.
* NOTE: this method modifies the member variables of its argument
*
* @param {*} worker A worker object, member variables would be modified
* @returns An object like so:
* {
* success: Boolean,
* fromBlock: Integer,
* toBlock: Integer
* }
* A function that returns the appropriate array of intervals,
* depending on the progress that the worker's made.
* If the exporter's caught up, we check for a new block. We then check whether the Node
* returns a valid block (sometimes the Node returns an early block, like 3 for example).
* We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration
* the function will return the appropriate array of intervals.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {Array} An array of intervals.
*/
async function nextIntervalCalculator(worker) {
// Check if we need to ask the Node for new Head block. This is an optimization to skip this call when the exporter
// is behind the last seen Head anyways.
const firstNewBlockCheck = isNewBlockAvailable(worker);
if (!firstNewBlockCheck) {
// On the previous cycle we closed the gap to the head of the blockchain.
// Check if there are new blocks now.
if (worker.lastExportedBlock >= worker.lastConfirmedBlock) {
const newConfirmedBlock = await worker.web3.eth.getBlockNumber() - constants.CONFIRMATIONS;
if (newConfirmedBlock > worker.lastConfirmedBlock) {
// The Node has progressed
if (worker.lastConfirmedBlock < newConfirmedBlock) {
worker.lastConfirmedBlock = newConfirmedBlock;
}
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return [];
}

if (firstNewBlockCheck || isNewBlockAvailable(worker)) {
// If data was available without asking with Node, we are catching up and should come back straight away
if (firstNewBlockCheck) {
worker.sleepTimeMsec = 0;
}
else {
// If data became available only after asking the Node, we are close to the Head, come back later
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
}
worker.sleepTimeMsec = 0;
const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock;
const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL;
let intervalArrayLength;
if (progressDifference < maxInterval) {
intervalArrayLength = Math.ceil(progressDifference / constants.BLOCK_INTERVAL);
} else {
intervalArrayLength = constants.MAX_CONCURRENT_REQUESTS;
}

return Array.from({ length: intervalArrayLength }, (_, i) => {
return {
success: true,
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: worker.lastExportedBlock + constants.BLOCK_INTERVAL * i + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL * (i + 1), worker.lastConfirmedBlock)
};
}
else {
// The Node has not progressed
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return { success: false };
}
});
}

module.exports = {
Expand Down
Loading