Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding parallelism to the ETH, ERC20, MATIC pipelines #162

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

spiderjako
Copy link
Contributor

No description provided.

@spiderjako spiderjako added the WIP label Nov 18, 2023
@spiderjako spiderjako force-pushed the add-parallelism-eth branch 4 times, most recently from d27322b to 059c34f Compare November 20, 2023 12:50
@spiderjako spiderjako removed the WIP label Nov 28, 2023
@spiderjako spiderjako marked this pull request as ready for review November 28, 2023 07:21
@spiderjako spiderjako changed the title Adding parallelism to the ETH pipeline Adding parallelism to the ETH, ERC20, MATIC pipelines Nov 28, 2023
const result = constants.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
await nextIntervalCalculator(this);
const requestIntervals = await nextIntervalCalculator(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you removing the 'export block list' feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've completely missed that. But for the time being, I'd remove this part of the code and rework it later on. It needs to be reworked to fit in with this functionality. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's integrate it again in a next PR not to slow down this one. It would be very similar to what you do in nextIntervalCalculator currently - generate a sequence of intervals with the parallelism in mind.

let retryIntervalMs = 0;
while (retries < constants.MAX_RETRIES) {
try {
const response = await this.ethClient.request(...params);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should (...params) be spread again, I thought that request would work with an array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would work with an array if it works with a batch of requests. Technically, [] is a batch, but I'd have to rework how the parameters are given. Currently we have

('trace_filter', [{
  fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
  toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}])

retryIntervalMs += (2000 * retries);
logger.error(`${params[0]} failed. Reason: ${response.error}. Retrying for ${retries} time`);
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't if-else, be cleaner than continue?

const response = await this.ethClient.request(...params);
if (response.error || response.result === null) {
retries++;
retryIntervalMs += (2000 * retries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use const value for 2000, like constants.BACKOFF_RETRY_STEP

blockchains/eth/eth_worker.js Show resolved Hide resolved
try {
const response = await this.ethClient.request(...params);
if (response.error || response.result === null) {
retries++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we even wait if the retries number is reached at this point, think MAX_RETRIES=0

Copy link
Contributor

@WonderBeat WonderBeat Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely wait. I suggest to use a library for retryer. it's error prone to write it yourself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect MAX_RETRIES to specify the number of retries, that is attempts to re-send after the initial sending. Instead if it is is 0, we would not send the request at all. If it is 1, we would send one time, on error wait, but then not retry. Seems to me like the logic is a bit off.

Comment on lines 48 to 53
retries++;
retryIntervalMs += (2000 * retries);
logger.error(
`Try block in ${params[0]} failed. Reason: ${err.toString()}. Waiting ${retryIntervalMs} and retrying for ${retries} time`
);
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be the same as above, can it be moved to waitOnError() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave it as it is and rework both this and the UTXO version in another PR. They currently have basically the same idea. Maybe I'll even combine them. Currently, the loggings point to different errors in general, one is to the response's error, the other is a caught error

worker.sleepTimeMsec = 0;
const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock;
const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL;
let intervalArrayLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be made const with the ternary operator.

Copy link
Contributor Author

@spiderjako spiderjako Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, but IMO it looks cleaner this way. With ternary it would be a bit too bloated. I like to flex with ternary but I realised a few weeks back that I don't have to always do it. I don't think it's a big deal that it isn't a const, as it's basically given as an argument at the next line and the function ends.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about flexing, const variables make it easier to reason about the code. Minor ofc, your call.

Copy link
Contributor

@WonderBeat WonderBeat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We implemented fork-join for a streaming task. this works, but doesn't utilize fully our bottleneck - bc node. Each iteration we spawn N concurrent requests and wait for all of them. We wait till the slowest request in a batch completes. It may be retried several times before completion. still we will wait for it.
Sorry, I believe that this is not the best design for the data transferring app.

try {
const response = await this.ethClient.request(...params);
if (response.error || response.result === null) {
retries++;
Copy link
Contributor

@WonderBeat WonderBeat Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely wait. I suggest to use a library for retryer. it's error prone to write it yourself

await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
continue;
}
return response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that this code path returns data structure and the path with a failure returns Promise? This looks odd

const result = constants.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
await nextIntervalCalculator(this);
const requestIntervals = await nextIntervalCalculator(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's integrate it again in a next PR not to slow down this one. It would be very similar to what you do in nextIntervalCalculator currently - generate a sequence of intervals with the parallelism in mind.

try {
const response = await this.ethClient.request(...params);
if (response.error || response.result === null) {
retries++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect MAX_RETRIES to specify the number of retries, that is attempts to re-send after the initial sending. Instead if it is is 0, we would not send the request at all. If it is 1, we would send one time, on error wait, but then not retry. Seems to me like the logic is a bit off.

worker.sleepTimeMsec = 0;
const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock;
const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL;
let intervalArrayLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about flexing, const variables make it easier to reason about the code. Minor ofc, your call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants