Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Feb 20, 2024
1 parent 7b5b244 commit 53662bf
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
26 changes: 17 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,22 @@ class Main {
}

async waitOnStoreEvents() {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
if (this.taskManager.buffer.length > 0) {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
}

if (this.taskManager.queue.isPaused) {
this.taskManager.queue.start();
this.taskManager.consequentTaskIndex = 0;
logger.info('Resuming the queue...');
}
}

async workLoop() {
Expand All @@ -102,7 +110,7 @@ class Main {
this.worker.lastExportTime = Date.now();

this.lastProcessedPosition = this.worker.getLastProcessedPosition();
if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents();
await this.waitOnStoreEvents();
this.updateMetrics();

if (this.shouldWork) {
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const CONFIG_PATH = process.env.CONFIG_PATH;
const START_BLOCK = parseInt(process.env.START_BLOCK || '0') - 1;
const EXPORT_BLOCKS_LIST = process.env.EXPORT_BLOCKS_LIST || false;
const PQUEUE_MAX_SIZE = parseInt(process.env.PQUEUE_MAX_SIZE || '100');
const MAX_TASK_DATA_KEYS = parseInt(process.env.PQUEUE_MAX_SIZE || '10');
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || '1');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50');
const START_PRIMARY_KEY = parseInt(process.env.START_PRIMARY_KEY || '-1');
Expand All @@ -17,6 +18,7 @@ module.exports = {
PQUEUE_MAX_SIZE,
START_PRIMARY_KEY,
EXPORT_BLOCKS_LIST,
MAX_TASK_DATA_KEYS,
EXPORT_TIMEOUT_MLS,
MAX_CONCURRENT_REQUESTS,
EXPORT_BLOCKS_LIST_MAX_INTERVAL
Expand Down
16 changes: 14 additions & 2 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
const { logger } = require('./logger');
const { MAX_TASK_DATA_KEYS } = require('./constants');

class TaskManager {
constructor() {
this.queue;
this.buffer = [];
this.taskData = {};
this.taskIndex = 0;
this.consequentTaskIndex = 0;
this.currentFromBlock;
}

Expand All @@ -15,7 +18,16 @@ class TaskManager {
async initQueue(maxConcurrentRequests) {
const PQueue = (await import('p-queue')).default;
this.queue = new PQueue({ concurrency: maxConcurrentRequests });
this.queue.on('completed', ([interval, data]) => this.handleNewData(interval, data));
this.queue.on('completed', ([interval, data]) => {
this.consequentTaskIndex++;
this.handleNewData(interval, data);
if (this.consequentTaskIndex >= MAX_TASK_DATA_KEYS) {
if (!this.queue.isPaused) {
this.queue.pause();
logger.info('Pausing the queue...');
}
}
});
}

/**
Expand Down

0 comments on commit 53662bf

Please sign in to comment.