-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Abstract the parallelism idea into a single entity #178
base: master
Are you sure you want to change the base?
Conversation
index.js
Outdated
if (interval.success) { | ||
delete interval.success; | ||
this.taskManager.pushToQueue(() => this.worker.work(interval)); | ||
this.worker.lastExportedBlock = interval.toBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason for moving this here is that the work()
would execute out of order? Here it's too optimistic, the work is not done yet, just scheduled. Better leave this in the work()
logic as it was. You can change the setting to:
if (result.toBlock > this.lastExportedBlock) {
this.lastExportedBlock = result.toBlock;
}
this would still not be exact (as past intervals may be pending) but should still be a bit more accurate than incrementing it once the interval is scheduled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastExportedBlock is only used for nextIntervalCalculator, meaning, it just gets its intervals from this
the work is not done yet, just scheduled
indeed, the tasks are just pushed with the calculated intervals, there's no guarantee, that the last one is finished, but lastExportedBlock doesn't overwrite any ZK state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I suggest rename this field. lastExportedBlock
is not what it seems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastExportedBlock is only used for nextIntervalCalculator
It is meant to be exposed to Prometheus, see updateMetrics()
index.js
Outdated
@@ -75,19 +84,27 @@ class Main { | |||
|
|||
async workLoop() { | |||
while (this.shouldWork) { | |||
if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better await on the promise returned by:
queue.onSizeLessThan(limit)
instead of returning from this method and then coming back after a timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think current implementation is better, the idea is to continue with kafka pushing, even if the queue is full. If I await the promise, I block the program, at least, as far as I understand
Returns a promise that settles when the queue size is less than the given limit: queue.size < limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just separate the things, and do not do them in a single loop. If you go back from this loop and wait for some magic number of milliseconds you are reducing the performance of your program. Also the Kafka writing, again do not do it in this loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the end of this loop there is a timeout (L 112) what should the timeout be when the above condition is false? You either set it to 0 and in effect busy loop until Pqueue resolves the promises or invent some number which would reduce your performance.
index.js
Outdated
this.worker.lastExportTime = Date.now(); | ||
|
||
this.updateMetrics(); | ||
this.lastProcessedPosition = this.worker.getLastProcessedPosition(); | ||
|
||
if (events && events.length > 0) { | ||
await this.exporter.storeEvents(events); | ||
if (this.taskManager.buffer && this.taskManager.buffer.length > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the consumer of the buffer is part of the main loop. I think it would make sense to separate it. Make the buffer blocking (wrap it into a simple class where you can call await waitOnData()
) and then start the consumer and make it block on the buffer.
index.js
Outdated
async #initTaskManager(blockNumber) { | ||
this.taskManager = new TaskManager(blockNumber, constants.BLOCK_INTERVAL); | ||
await this.taskManager.initPQueue(MAX_CONCURRENT_REQUESTS); | ||
} | ||
|
||
#isWorkerSet() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's quite common for isSomething
methods to return bool
as an answer. This one returns void and confuses me a bit because it throws on error. which might be not the best strategy for the caller
index.js
Outdated
if (interval.success) { | ||
delete interval.success; | ||
this.taskManager.pushToQueue(() => this.worker.work(interval)); | ||
this.worker.lastExportedBlock = interval.toBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I suggest rename this field. lastExportedBlock
is not what it seems.
index.js
Outdated
if (this.taskManager.buffer && this.taskManager.buffer.length > 0) { | ||
await this.exporter.storeEvents(this.taskManager.buffer); | ||
this.lastProcessedPosition = this.taskManager.lastProcessedPosition; | ||
this.taskManager.buffer = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried and almost sure that buffer may be modified by Task Manager between storeEvents
and clean
0739740
to
d9f47ea
Compare
lib/task_manager.js
Outdated
} | ||
|
||
clearQueue() { | ||
this.taskManager.queue.pause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pause doesn't pause already in flight tasks. We clean queue next step which won't work if there's a long running task running
lib/task_manager.js
Outdated
|
||
#generateDefensiveCopy() { | ||
const bufferCopy = []; | ||
for (const data of this.buffer) bufferCopy.push(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most probably we can use JS clone which should be faster and readable
blockchains/eth/eth_worker.js
Outdated
@@ -145,11 +146,10 @@ class ETHWorker extends BaseWorker { | |||
return result; | |||
} | |||
|
|||
async work() { | |||
async work(key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You give this 'key' argument to the work()
method just to have it back. It does not need to know about this key, it does nothing with it. Wrap the call in a closure if you need to preserve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look really elegant atm, but I didn't think of another way to somehow set an order of the tasks beforehand
but I think I've thought of a way with queue.on('add'), which would be elegant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap the call in a closure if you need to preserve it.
Wouldn't what I suggested work? I am thinking this:
const key = "some key"
this.taskManager.pushToQueue(() => {
const result = this.worker.work()
return [key, result]
}
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this is simple, yes. Thanks
return { | ||
success: true, | ||
fromBlock: worker.lastBufferedBlock + 1, | ||
toBlock: Math.min(worker.lastBufferedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function depends on some side effects to work correctly. The worker must modify worker.lastBufferedBlock
before the next call, this is imperative or same intervals would be created. I think the logic of the function can be improved in two ways:
- Break the 'isNewBlockAvailable' logic. It has nothing to do with the interval.
- Make the arguments which the function depend on more explicit. This would make it more obvious to the caller that it is up to him to provide updated 'from' value. The function needs to know two arguments (lastConfirmedBlock, toBlock).
lib/task_manager.js
Outdated
|
||
#shiftOriginalBuffer(shiftsCount) { | ||
while (shiftsCount >= 0) { | ||
this.buffer.shift(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's that shenanigan with shifting by one. Just erase the whole buffer by setting length to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do that, you risk losing data.
Let's say we have a buffer of [1,2,3,4]. You copy this. In the meantime, between the copy and the buffer deletion, another element is added at the back, let's say 5. Then you'll erase the whole buffer
Though, I think this could be improved with Array.slice, now that you say it. Maybe it'd be a bit more optimal. But a shenanigan is indeed needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the meantime, between the copy and the buffer deletion, another element is added at the back
What do you mean, NodeJS is single threaded, this is the whole proposition. Also if it wasn't (we were dealing with some other language) this approach to access the same data structure most probably wouldn't have been enough to give us thread safety.
lib/worker_base.js
Outdated
@@ -43,7 +43,7 @@ class WorkerBase { | |||
*/ | |||
getLastProcessedPosition() { | |||
return { | |||
blockNumber: this.lastExportedBlock, | |||
blockNumber: this.lastBufferedBlock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's wrong. This is the state we would preserve in ZK from which we would recover on restart. Last buffered block is too optimistic, this is not yet in Kafka.
lib/task_manager.js
Outdated
while (this.taskData[this.lastPushedToBuffer]) { | ||
for (const data of this.taskData[this.lastPushedToBuffer]) { | ||
this.buffer.push(data); | ||
this.lastProcessedPosition = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting messy, we have 'lastProcessed' and 'lastBuffered'. However here you are mixing them here, the fact that something is eligible for going in the buffer doesn't make it any more 'processed', it still needs to be consumed from the Kafka side and send through the producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this member variable. The user of this class can see what the last value is once it calls retrieveCompleted
@@ -170,7 +170,7 @@ class CardanoWorker extends BaseWorker { | |||
} | |||
|
|||
async work() { | |||
const fromBlock = this.lastExportedBlock + 1; | |||
const fromBlock = this.lastBufferedBlock + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of all those renames. You resorted to renaming the 'last exported block' to 'last buffered block' to make it comply more easily with your change. But:
- All those other workers don't know what 'buffered block' is
- I would argue that 'last exported block' is cleaner in terms of semantics. The last block which got into Kafka. What is 'last buffered block'?
5708174
to
1980390
Compare
lib/task_manager.js
Outdated
pushToQueue(workTask) { | ||
this.queue.add(() => { | ||
const result = workTask(); | ||
const currIndex = this.taskIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I thought would be enough, but will update this. Seems this is not enough for copying.
eab81fc
to
8c5c1de
Compare
9ef36b0
to
0c72c36
Compare
index.js
Outdated
} | ||
await this.exporter.savePosition(this.lastProcessedPosition); | ||
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); | ||
if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition on the size of a member variable of another object breaks encapsulation. Is this a performance optimization to spare us the Promise in some case? I would prefer to just call waitOnStoreEvents
every time and leave this check inside. If you want the performance optimization, maybe create a method on TaskManager
, called isDataAvailable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can't push [] to Kafka. Initially I forgot about this check, but then got some errors.
https://github.com/santiment/san-chain-exporter/blob/master/index.js#L86
We have it in master as well.
But maybe I'll move it into the function, as you say
lib/task_manager.js
Outdated
* @param {Array} newTransformedData | ||
*/ | ||
handleNewData(interval, newTransformedData) { | ||
this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at this point we need to have some blocking mechanism to block the producers. Currently we have no way to throttle the tasks coming from the PQueue and we would run into an OOM if the consumer of this data (the logic writing into Kafka) is slower. Right?
3a2c05c
to
53662bf
Compare
29145ae
to
59ecbfb
Compare
lib/task_manager.js
Outdated
* pause the task generation for a bit. | ||
*/ | ||
#maybePauseQueue() { | ||
if (this.consequentTaskIndex >= MAX_TASK_DATA_KEYS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only increase this counter consequentTaskIndex
until it exceeds MAX_TASK_DATA_KEYS
, then you always pause. This behavior doesn't reflect the actual speed of the Kafka producer. What you should be doing instead is throttle based on the size of buffer
.
index.js
Outdated
@@ -95,6 +128,26 @@ class Main { | |||
} | |||
} | |||
|
|||
async workLoopV2() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea with the gradual migration.
index.js
Outdated
} | ||
|
||
async waitOnStoreEvents() { | ||
const buffer = this.taskManager.retrieveCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your current implementation some logic which is not native for the TaskManager has been put inside it (last exported block, primary keys). It is better to have more separation of concern. For example setting primary keys as per the logic of each blockchain is not native to it, ideally the TaskManager shouldn't even know which blockchain it is working on. Decorating the result data is something which you can do here.
lib/task_manager.js
Outdated
}; | ||
logger.info(`Initialized exporter with initial position ${JSON.stringify(lastProcessedPosition)}`); | ||
} | ||
this.lastExportedBlock = lastProcessedPosition.blockNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for that logic to be encapsulated into the TaskManager
lib/task_manager.js
Outdated
* @param {Array} buffer The array that at the end of the primary function would result in the | ||
* combined data, accordingly updated with the primary keys | ||
*/ | ||
#updatePrimaryKeysPushToBuffer(events, buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this to happen here. The TaskManager can provide the data as it is and it can be decorated on the upper level.
index.js
Outdated
* @param {Array} buffer The array of events that would be accordingly updated with the primary keys. | ||
*/ | ||
applyPrimaryKeyLogic(buffer) { | ||
if (BLOCKCHAIN === 'eth') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can leverage polymorphism and have 'decorateWithPrimaryKeys' method in different workers, instead of moving blockchain specific logic in the index.js
.
lib/task_manager.js
Outdated
this.lastPrimaryKey += bufferLength; | ||
} | ||
|
||
isChangedLastExportedBlock(lastProcessedBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of a weird method to compare two numbers. The more natural way would be to have a simple get method here and compare at the caller. Minor, your call.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. | ||
* @returns {object} The interval, derived from the progress of the worker | ||
*/ | ||
function nextIntervalCalculatorV2(worker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the original:
function nextIntervalCalculator(worker) {
return {
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}
to
function nextIntervalCalculator(lastExported, lastConfirmed, blockInterval) {
...
}
would allow you to reuse the same code. Also this is considered better design, as it more clearly denotes the intent of the function.
index.js
Outdated
@@ -29,9 +32,22 @@ class Main { | |||
.catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); | |||
} | |||
|
|||
initLastProcessedPosition(lastRecoveredPosition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic used to be part of the worker, what's the gain in moving it here? It seems like half the logic has remained there and half is here.
index.js
Outdated
@@ -70,7 +95,41 @@ class Main { | |||
metrics.currentBlock.set(this.worker.lastConfirmedBlock); | |||
metrics.requestsCounter.inc(this.worker.getNewRequestsCount()); | |||
metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime); | |||
metrics.lastExportedBlock.set(this.worker.lastExportedBlock); | |||
if (BLOCKCHAIN === 'eth') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this logic can't be unified. The 'eth' case can still have this.worker.lastExportedBlock
set to the actual block that is written in Kafka.
lib/task_manager.js
Outdated
* Used when updating the position of the exporter in ZooKeeper. | ||
* @returns An object of the last exported block and last used primary key | ||
*/ | ||
getLastProcessedPosition() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this here, you don't need neither values in the TM?
lib/task_manager.js
Outdated
/** | ||
* @param {number} bufferLength The length of the buffer of updated events | ||
*/ | ||
updateLastPrimaryKey(bufferLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before TM doesn't need to know about primary keys.
3a95c28
to
09cfb87
Compare
index.js
Outdated
@@ -73,6 +86,24 @@ class Main { | |||
metrics.lastExportedBlock.set(this.worker.lastExportedBlock); | |||
} | |||
|
|||
async waitOnStoreEvents(buffer) { | |||
if (buffer.length > 0) { | |||
this.worker.decorateWithPrimaryKeys(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decoration can happen before calling this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit strange that we modify function argument. could we use return value to make this code a bit easier to read?
index.js
Outdated
@@ -73,6 +86,24 @@ class Main { | |||
metrics.lastExportedBlock.set(this.worker.lastExportedBlock); | |||
} | |||
|
|||
async waitOnStoreEvents(buffer) { | |||
if (buffer.length > 0) { | |||
this.worker.decorateWithPrimaryKeys(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decorateWithPrimaryKeys
should be declared in the base class and implemented for all workers.
index.js
Outdated
constantsBase.BLOCK_INTERVAL); | ||
this.worker.lastQueuedBlock = interval.toBlock; | ||
intervals.push(interval); | ||
if (interval.toBlock - interval.fromBlock + 1 < constantsBase.BLOCK_INTERVAL) break; //We've caught up with the head, no new intervals needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why the
+1
? - Seems the easier to follow logic is to break when
(intrval.fromBlock - interval.toBlock) <= 0
index.js
Outdated
|
||
const workerContext = await analyzeWorkerContext(this.worker); | ||
setWorkerSleepTime(this.worker, workerContext); | ||
if (workerContext === NO_WORK_SLEEP) return [];//TODO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to revisit this case, would exit atm.
index.js
Outdated
} | ||
} | ||
|
||
async updatePosition(lastPrimaryKey, lastExportedBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you do not update the this.lastExportedBlock
member variable defined in the base worker. It is used for metrics purposes.
test/eth/worker.spec.js
Outdated
}); | ||
|
||
|
||
it('test primary key assignment', async function () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was because of my previous version, that delegated the primary key assignment elsewhere. I'll bring it back, though probably reworked, as now it's not done in the work()
test/lib/task_manager.spec.js
Outdated
|
||
describe('TaskManager', () => { | ||
it('constructor initializes corresponding variables', () => { | ||
const taskManager = new TaskManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better unit test the static method, which the rest of the code is using. In other languages the constructor would have been private
but I guess this is difficult to achieve in JS.
test/lib/task_manager.spec.js
Outdated
taskManager.consequentTaskIndex = MAX_TASK_DATA_KEYS; | ||
taskManager.queue.pause(); | ||
|
||
taskManager.restartQueueIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this defined?
test/lib/task_manager.spec.js
Outdated
it('retrieveCompleted returns only the first part of the keys data when holes are present in `taskData`', async () => { | ||
const CONCURRENCY = 1; | ||
const taskManager = await TaskManager.create(CONCURRENCY); | ||
taskManager.initPosition({ blockNumber: 0, primaryKey: 0 }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. where is TaskManager::initPosition()
defined, can't find it.
test/lib/task_manager.spec.js
Outdated
const emmitedTaskData = [{ fromBlock: 1, toBlock: 10 }, exampleEventsData]; | ||
taskManager.queue.emit('completed', emmitedTaskData); | ||
|
||
expect(taskManager.queue.isPaused).to.eq(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that test reflects the implementation, I think all the TaskManager tests need revisiting. In the current implementation the TaskManager is a nice standalone implementation which can be black box tested.
lib/task_manager.js
Outdated
@@ -0,0 +1,84 @@ | |||
class TaskManager { | |||
constructor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you make it private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it private, I'd need to migrate this to Typescript, but I'd do that in a following PR. JS doesn't allow private classes
lib/task_manager.js
Outdated
this.taskData[this.lastTaskIndex] = taskMetadata; | ||
const taskIndex = this.lastTaskIndex; | ||
this.queue.add(async () => { | ||
const result = await taskMetadata.lambda(taskMetadata.interval); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass taskMetadata
to labda as is. I don't think this method should be aware of .interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you do need them. You'd need to know, which is the last handled lastExportedBlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's do some integration tests before merging.
Description
Why Task Manager
The great benefit of the TaskManager class is that it deals with concurrent worker work methods, whilst at the same time keeping a constant load on Kafka. As time goes by, a lot of the functionality would be transfered from the worker classes to the TaskManager class, because of the need for the TM to take care of the order, primary keys, etc.
I've decided to split the functionality into two versions, one working as before and one - the "new way". I'd need to think of a way to unify the workers' way of calculating intervals and returning the events in a single fashion, but that I thought would take some time. The XRP and Cardano workers in particular were a bit odd for this unification process, at least at first glance. On top of that, I think it's best if we didn't change all of the exporters' logic and we work on iterations.
That being said, I want to add tests for the separate versions, but that will happen in the next few days (w.26.02)
A lot of the stuff from my previous commits I've simplified to a great extent, no need for a buffer [] property, with that, I've made the "retrieveCompleted" function a bit simpler. A lot of functions were added for encapsulation's sake, it looks better too.
Primary Key logic was moved into TM, because of the worker not being able to keep the order, same as with
lastExportedBlock
. For now, I just copied the logic from the eth worker, but in the future, I'd like to somehow unify this under the #pushToBuffer function.