From 91c35786532558b5329820eae1d4f0a7f605db2b Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Wed, 10 Jul 2024 10:28:45 +0300 Subject: [PATCH 1/8] Break 'Exporter' into KafkaStorage and ZookeeperState classes --- src/blockchains/erc20/erc20_worker.ts | 4 +- src/blockchains/utxo/utxo_worker.ts | 4 +- src/e2e/producer-transaction.spec.ts | 30 ++--- src/lib/kafka_storage.ts | 169 +++----------------------- src/lib/worker_base.ts | 4 +- src/lib/zookeeper_state.ts | 110 +++++++++++++++++ src/main.ts | 38 +++--- src/test/index.test.ts | 120 +++++++++--------- 8 files changed, 234 insertions(+), 245 deletions(-) create mode 100644 src/lib/zookeeper_state.ts diff --git a/src/blockchains/erc20/erc20_worker.ts b/src/blockchains/erc20/erc20_worker.ts index a3f1e789..21f40e69 100644 --- a/src/blockchains/erc20/erc20_worker.ts +++ b/src/blockchains/erc20/erc20_worker.ts @@ -1,6 +1,6 @@ 'use strict'; import { logger } from '../../lib/logger'; -import { Exporter } from '../../lib/kafka_storage'; +import { KafkaStorage } from '../../lib/kafka_storage'; import { constructRPCClient } from '../../lib/http_client'; import { extendEventsWithPrimaryKey } from './lib/extend_events_key'; import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite'; @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker { this.allOldContracts = []; } - async init(exporter?: Exporter) { + async init(exporter?: KafkaStorage) { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; if (this.settings.EXPORT_BLOCKS_LIST) { diff --git a/src/blockchains/utxo/utxo_worker.ts b/src/blockchains/utxo/utxo_worker.ts index af1c5039..1ec40de4 100644 --- a/src/blockchains/utxo/utxo_worker.ts +++ b/src/blockchains/utxo/utxo_worker.ts @@ -2,7 +2,7 @@ import { logger } from '../../lib/logger'; import { constructRPCClient } from '../../lib/http_client'; import { BaseWorker } from '../../lib/worker_base'; -import { Exporter } from '../../lib/kafka_storage'; +import { KafkaStorage } from '../../lib/kafka_storage'; import { HTTPClientInterface } from '../../types'; @@ -33,7 +33,7 @@ export class UTXOWorker extends BaseWorker { this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT); } - async init(exporter: Exporter) { + async init(exporter: KafkaStorage) { const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []); this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS; await exporter.initPartitioner((event: any) => event['height']); diff --git a/src/e2e/producer-transaction.spec.ts b/src/e2e/producer-transaction.spec.ts index 89b74a37..548583c3 100644 --- a/src/e2e/producer-transaction.spec.ts +++ b/src/e2e/producer-transaction.spec.ts @@ -1,4 +1,4 @@ -import { Exporter } from '../lib/kafka_storage'; +import { KafkaStorage } from '../lib/kafka_storage'; import Kafka from 'node-rdkafka'; const KAFKA_URL: string = assertStringEnv(process.env.KAFKA_URL); const KAFKA_TOPIC: string = assertStringEnv(process.env.KAFKA_TOPIC); @@ -81,26 +81,26 @@ class TestConsumer { describe('Producer transactions', function () { - let exporter: Exporter; + let kafkaStorage: KafkaStorage; let testConsumer: TestConsumer; let num_messages_test = 3; beforeEach(function (done) { this.timeout(20000); - exporter = new Exporter('test-exporter', true, KAFKA_TOPIC); - exporter.connect().then(() => { + kafkaStorage = new KafkaStorage('test-exporter', true, KAFKA_TOPIC); + kafkaStorage.connect().then(() => { testConsumer = new TestConsumer(KAFKA_TOPIC, num_messages_test); done(); }); }); - afterEach(function (done) { + afterEach(async function (done) { this.timeout(10000); - exporter.disconnect(() => { - testConsumer.disconnect(function () { - done(); - }); + await kafkaStorage.disconnect() + + testConsumer.disconnect(function () { + done(); }); }); @@ -109,21 +109,21 @@ describe('Producer transactions', function () { await testConsumer.waitSubscribed(); - await exporter.initTransactions(); - await exporter.beginTransaction(); + await kafkaStorage.initTransactions(); + await kafkaStorage.beginTransaction(); // Do a small delay before starting writing messages, otherwise the consumer is missing them. // This should not really be needed, because we have received the 'subscribed' event in the // consumer but there is something I am missing. setTimeout(async function () { for (let i = 0; i < num_messages_test; i++) { - exporter.sendDataWithKey({ + kafkaStorage.sendDataWithKey({ timestamp: 10000000, iso_date: new Date().toISOString(), key: 1 }, 'key', null); } - await exporter.commitTransaction(); + await kafkaStorage.commitTransaction(); }, 2000); await testConsumer.waitData(); @@ -132,7 +132,7 @@ describe('Producer transactions', function () { it('using the \'storeEvents\' function should begin and commit a transaction', async function () { // We need the huge timeout because starting and closing a transaction takes around 1 sec this.timeout(10000); - await exporter.initTransactions(); + await kafkaStorage.initTransactions(); const testEvent = { 'contract': '0xdac17f958d2ee523a2206206994597c13d831ec7', @@ -151,7 +151,7 @@ describe('Producer transactions', function () { setTimeout(async function () { for (let i = 0; i < num_messages_test; i++) { - await exporter.storeEvents([testEvent], false); + await kafkaStorage.storeEvents([testEvent], false); } }, 1000); diff --git a/src/lib/kafka_storage.ts b/src/lib/kafka_storage.ts index ed6dd44f..5a97d1f1 100644 --- a/src/lib/kafka_storage.ts +++ b/src/lib/kafka_storage.ts @@ -1,14 +1,10 @@ import crypto from 'crypto'; -import Kafka, { LibrdKafkaError, ProducerGlobalConfig } from 'node-rdkafka'; +import Kafka, { ProducerGlobalConfig } from 'node-rdkafka'; import { BLOCKCHAIN } from './constants'; -import ZookeeperClientAsync from './zookeeper_client_async'; import { log_according_to_syslog_level, logger, SYSLOG_LOG_LEVEL } from './logger'; const ZOOKEEPER_URL: string = process.env.ZOOKEEPER_URL || 'localhost:2181'; -const ZOOKEEPER_RETRIES: number = parseInt(process.env.ZOOKEEPER_RETRIES || '0'); -const ZOOKEEPER_SPIN_DELAY: number = parseInt(process.env.ZOOKEEPER_SPIN_DELAY || '1000'); -const ZOOKEEPER_SESSION_TIMEOUT: number = parseInt(process.env.ZOOKEEPER_SESSION_TIMEOUT || '30000'); const FORMAT_HEADER: string = 'format=json;'; const RDKAFKA_DEBUG: string | null = process.env.RDKAFKA_DEBUG || null; @@ -19,17 +15,6 @@ const BUFFERING_MAX_MESSAGES: number = parseInt(process.env.BUFFERING_MAX_MESSAG const TRANSACTIONS_TIMEOUT_MS: number = parseInt(process.env.TRANSACTIONS_TIMEOUT_MS || '60000'); const KAFKA_MESSAGE_MAX_BYTES: number = parseInt(process.env.KAFKA_MESSAGE_MAX_BYTES || '10485760'); -process.on('unhandledRejection', (reason: unknown, p: Promise): void => { - // Otherwise unhandled promises are not possible to trace with the information logged - if (reason instanceof Error) { - logger.error('Unhandled Rejection at: ', p, 'reason:', reason, 'error stack:', (reason as Error).stack); - } - else { - logger.error('Unhandled Rejection at: ', p, 'reason:', reason); - } - process.exit(1); -}); - /** * A class to pick partition for an event. */ @@ -99,11 +84,10 @@ function castCompression(compression: string): 'none' | 'gzip' | 'snappy' | 'lz4 throw new Error(`Invalid compression value: ${compression}`); } -export class Exporter { +export class KafkaStorage { private readonly exporter_name: string; private readonly producer: Kafka.Producer; private readonly topicName: string; - private readonly zookeeperClient: ZookeeperClientAsync; private partitioner: Partitioner | null; constructor(exporter_name: string, transactional: boolean, topicName: string) { @@ -139,14 +123,6 @@ export class Exporter { log_according_to_syslog_level(log.severity, log.fac, log.message); }); - this.zookeeperClient = new ZookeeperClientAsync(ZOOKEEPER_URL, - { - sessionTimeout: ZOOKEEPER_SESSION_TIMEOUT, - spinDelay: ZOOKEEPER_SPIN_DELAY, - retries: ZOOKEEPER_RETRIES - } - ); - this.partitioner = null; } @@ -162,20 +138,13 @@ export class Exporter { /** * @returns {Promise} Promise, resolved on connection completed. */ - async connect() { + async connect(): Promise { logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`); - try { - await this.zookeeperClient.connectAsync(); - } - catch (ex) { - console.error('Error connecting to Zookeeper: ', ex); - throw ex; - } logger.info(`Connecting to kafka host ${KAFKA_URL}`); - const promise_result = new Promise((resolve, reject) => { - this.producer.on('ready', resolve); + const promise_result = new Promise((resolve, reject) => { + this.producer.on('ready', () => resolve()); this.producer.on('event.error', reject); // The user can provide a callback for delivery reports with the // dedicated method 'subscribeDeliveryReports'. @@ -190,131 +159,23 @@ export class Exporter { } /** - * Disconnect from Zookeeper and Kafka. + * Disconnect from Kafka. * This method is completed once the callback is invoked. */ - disconnect(callback?: () => void) { - logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`); - this.zookeeperClient.closeAsync().then(() => { - if (this.producer.isConnected()) { - logger.info(`Disconnecting from kafka host ${KAFKA_URL}`); - this.producer.disconnect(callback); - } - else { - logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`); - } - }); - } - - async getLastPosition() { - if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { - const previousPosition = await this.zookeeperClient.getDataAsync( - this.zookeeperPositionNode - ); - - try { - if (Buffer.isBuffer(previousPosition && previousPosition.data)) { - const value = previousPosition.data.toString('utf8'); - - if (value.startsWith(FORMAT_HEADER)) { - return JSON.parse(value.replace(FORMAT_HEADER, '')); - } else { - return previousPosition.data; - } - } - } catch (err) { - logger.error(err); - } - } - - return null; - } - - async getLastBlockTimestamp() { - if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) { - const previousPosition = await this.zookeeperClient.getDataAsync( - this.zookeeperTimestampNode - ); - - try { - if (Buffer.isBuffer(previousPosition && previousPosition.data)) { - const value = previousPosition.data.toString('utf8'); - - if (value.startsWith(FORMAT_HEADER)) { - return JSON.parse(value.replace(FORMAT_HEADER, '')); - } else { - return previousPosition.data; - } - } - } catch (err) { - logger.error(err); - } + async disconnect(): Promise { + if (!this.producer.isConnected()) { + logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`); + return; } - return null; - } - - async savePosition(position: object) { - if (typeof position !== 'undefined') { - const newNodeValue = Buffer.from( - FORMAT_HEADER + JSON.stringify(position), - 'utf-8' - ); - - if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { - return this.zookeeperClient.setDataAsync( - this.zookeeperPositionNode, - newNodeValue - ); - } else { - return this.zookeeperClient.mkdirpAsync( - this.zookeeperPositionNode, - newNodeValue - ); - } - } - } + logger.info(`Disconnecting from kafka host ${KAFKA_URL}`); + const promise_result = new Promise((resolve, reject) => { + this.producer.disconnect(() => resolve()); + }) - async saveLastBlockTimestamp(blockTimestamp: number) { - if (typeof blockTimestamp !== 'undefined') { - const newNodeValue = Buffer.from( - FORMAT_HEADER + JSON.stringify(blockTimestamp), - 'utf-8' - ); - - if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) { - return this.zookeeperClient.setDataAsync( - this.zookeeperTimestampNode, - newNodeValue - ); - } else { - return this.zookeeperClient.mkdirpAsync( - this.zookeeperTimestampNode, - newNodeValue - ); - } - } + await promise_result; } - async sendData(events: Array) { - if (events.constructor !== Array) { - events = [events]; - } - - events = events.map( - event => (typeof event === 'object' ? JSON.stringify(event) : event) - ); - events.forEach(event => { - this.producer.produce(this.topicName, null, Buffer.from(event)); - }); - - return new Promise((resolve, reject) => - this.producer.flush(KAFKA_FLUSH_TIMEOUT, (err: LibrdKafkaError) => { - if (err) return reject(err); - resolve(); - }) - ); - } async sendDataWithKey(events: object | Array, keyField: string, signalRecordData: object | null) { const arrayEvents: Array = (events.constructor !== Array) ? [events] : events diff --git a/src/lib/worker_base.ts b/src/lib/worker_base.ts index 28f76758..d547544a 100644 --- a/src/lib/worker_base.ts +++ b/src/lib/worker_base.ts @@ -1,6 +1,6 @@ 'use strict'; import { logger } from './logger'; -import { Exporter } from './kafka_storage'; +import { KafkaStorage } from './kafka_storage'; import { ExporterPosition } from '../types' export class BaseWorker { @@ -34,7 +34,7 @@ export class BaseWorker { throw new Error('"work" method need to be overriden'); } // To be implemented on inheritance. - async init(_exporter: Exporter) { + async init(_exporter: KafkaStorage) { throw new Error('"init" method need to be overriden'); } diff --git a/src/lib/zookeeper_state.ts b/src/lib/zookeeper_state.ts new file mode 100644 index 00000000..485e1957 --- /dev/null +++ b/src/lib/zookeeper_state.ts @@ -0,0 +1,110 @@ +import ZookeeperClientAsync from './zookeeper_client_async'; +import { logger } from './logger'; + + +const ZOOKEEPER_URL: string = process.env.ZOOKEEPER_URL || 'localhost:2181'; +const ZOOKEEPER_RETRIES: number = parseInt(process.env.ZOOKEEPER_RETRIES || '0'); +const ZOOKEEPER_SPIN_DELAY: number = parseInt(process.env.ZOOKEEPER_SPIN_DELAY || '1000'); +const ZOOKEEPER_SESSION_TIMEOUT: number = parseInt(process.env.ZOOKEEPER_SESSION_TIMEOUT || '30000'); + +const FORMAT_HEADER: string = 'format=json;'; + + +export class ZookeeperState { + private readonly exporter_name: string; + private readonly topicName: string; + private readonly zookeeperClient: ZookeeperClientAsync; + + + constructor(exporter_name: string, topicName: string) { + this.exporter_name = exporter_name; + + this.topicName = topicName; + + this.zookeeperClient = new ZookeeperClientAsync(ZOOKEEPER_URL, + { + sessionTimeout: ZOOKEEPER_SESSION_TIMEOUT, + spinDelay: ZOOKEEPER_SPIN_DELAY, + retries: ZOOKEEPER_RETRIES + } + ); + } + + get zookeeperPositionNode() { + // Generally it may be an arbitrary position object, not necessarily block number. We keep this name for backward compatibility + return `/${this.exporter_name}/${this.topicName}/block-number`; + } + + get zookeeperTimestampNode() { + return `/${this.exporter_name}/${this.topicName}/timestamp`; + } + + /** + * @returns {Promise} Promise, resolved on connection completed. + */ + async connect(): Promise { + logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`); + + try { + await this.zookeeperClient.connectAsync(); + } + catch (ex) { + console.error('Error connecting to Zookeeper: ', ex); + throw ex; + } + } + + /** + * Disconnect from Zookeeper. + */ + async disconnect() { + logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`); + await this.zookeeperClient.closeAsync(); + } + + async getLastPosition() { + if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { + const previousPosition = await this.zookeeperClient.getDataAsync( + this.zookeeperPositionNode + ); + + try { + if (Buffer.isBuffer(previousPosition && previousPosition.data)) { + const value = previousPosition.data.toString('utf8'); + + if (value.startsWith(FORMAT_HEADER)) { + return JSON.parse(value.replace(FORMAT_HEADER, '')); + } else { + return previousPosition.data; + } + } + } catch (err) { + logger.error(err); + } + } + + return null; + } + + async savePosition(position: object) { + if (typeof position !== 'undefined') { + const newNodeValue = Buffer.from( + FORMAT_HEADER + JSON.stringify(position), + 'utf-8' + ); + + if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { + return this.zookeeperClient.setDataAsync( + this.zookeeperPositionNode, + newNodeValue + ); + } else { + return this.zookeeperClient.mkdirpAsync( + this.zookeeperPositionNode, + newNodeValue + ); + } + } + } +} + diff --git a/src/main.ts b/src/main.ts index 76478982..16ee1601 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,7 +4,8 @@ import { Server, IncomingMessage, ServerResponse } from 'http' const { send, serve } = require('micro'); const metrics = require('./lib/metrics'); import { logger } from './lib/logger'; -import { Exporter } from './lib/kafka_storage'; +import { KafkaStorage } from './lib/kafka_storage'; +import { ZookeeperState } from './lib/zookeeper_state'; const EXPORTER_NAME = process.env.EXPORTER_NAME || 'san-chain-exporter'; import { EXPORT_TIMEOUT_MLS } from './lib/constants'; import { constructWorker } from './blockchains/construct_worker' @@ -15,7 +16,8 @@ import { BaseWorker } from './lib/worker_base'; export class Main { private worker!: BaseWorker; private shouldWork: boolean; - private exporter!: Exporter; + private kafkaStorage!: KafkaStorage; + private zookeeperState!: ZookeeperState; private lastProcessedPosition!: ExporterPosition; private microServer: Server; @@ -29,17 +31,21 @@ export class Main { async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) { const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: '; - this.exporter = new Exporter(exporterName, isTransactions, kafkaTopic); - await this.exporter + this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); + await this.kafkaStorage + .connect() + .then(() => this.kafkaStorage.initTransactions()) + .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); + await this.zookeeperState .connect() - .then(() => this.exporter.initTransactions()) .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); } async handleInitPosition() { - const lastRecoveredPosition = await this.exporter.getLastPosition(); + const lastRecoveredPosition = await this.zookeeperState.getLastPosition(); this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); - await this.exporter.savePosition(this.lastProcessedPosition); + await this.zookeeperState.savePosition(this.lastProcessedPosition); } #isWorkerSet() { @@ -63,7 +69,7 @@ export class Main { this.#isWorkerSet(); logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(mergedConstants))}`); this.worker = constructWorker(blockchain, mergedConstants); - await this.worker.init(this.exporter); + await this.worker.init(this.kafkaStorage); await this.handleInitPosition(); } @@ -105,9 +111,9 @@ export class Main { this.lastProcessedPosition = this.worker.getLastProcessedPosition(); if (events && events.length > 0) { - await this.exporter.storeEvents(events, constantsBase.WRITE_SIGNAL_RECORDS_KAFKA); + await this.kafkaStorage.storeEvents(events, constantsBase.WRITE_SIGNAL_RECORDS_KAFKA); } - await this.exporter.savePosition(this.lastProcessedPosition); + await this.zookeeperState.savePosition(this.lastProcessedPosition); logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); if (this.shouldWork) { @@ -117,11 +123,13 @@ export class Main { } async disconnect() { - // This call should be refactored to work with async/await - if (this.exporter !== undefined) { - this.exporter.disconnect(); + if (this.kafkaStorage) { + await this.kafkaStorage.disconnect(); + } + if (this.zookeeperState) { + await this.zookeeperState.disconnect(); } - if (this.microServer !== undefined) { + if (this.microServer) { this.microServer.close(); } } @@ -138,7 +146,7 @@ export class Main { } healthcheckKafka(): Promise { - if (this.exporter.isConnected()) { + if (this.kafkaStorage.isConnected()) { return Promise.resolve(); } else { return Promise.reject('Kafka client is not connected to any brokers'); diff --git a/src/test/index.test.ts b/src/test/index.test.ts index 0793c54b..a45e1e18 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -8,7 +8,8 @@ import { Main } from '../main'; const { Main: MainRewired } = rewire('../main'); const { main } = rewire('../index'); import { BaseWorker } from '../lib/worker_base'; -import { Exporter } from '../lib/kafka_storage'; +import { KafkaStorage } from '../lib/kafka_storage'; +import { ZookeeperState } from '../lib/zookeeper_state'; import { ETHWorker } from '../blockchains/eth/eth_worker'; import * as ethConstants from '../blockchains/eth/lib/constants'; import zkClientAsync from '../lib/zookeeper_client_async'; @@ -21,8 +22,14 @@ describe('Main tests', () => { START_PRIMARY_KEY: -1 }; + let sandbox: any = null; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + afterEach(() => { - sinon.restore(); + sandbox.restore(); }); it('initExporter returns error when Exporter connect() fails', async () => { @@ -31,6 +38,10 @@ describe('Main tests', () => { .rejects(new Error('Exporter connection failed')); const mainInstance = new Main(); + + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + try { await mainInstance.init(blockchain); } catch (err) { @@ -44,13 +55,8 @@ describe('Main tests', () => { }); it('initExporter returns error when Exporter initTransactions() fails', async () => { - sinon - .stub(Exporter.prototype, 'connect') - .resolves(); - - sinon - .stub(Exporter.prototype, 'initTransactions') - .rejects(new Error('Exporter initTransactions failed')); + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').rejects(new Error('Exporter initTransactions failed')); const mainInstance = new Main(); try { @@ -66,11 +72,11 @@ describe('Main tests', () => { }); it('handleInitPosition changes the lastProcessedPosition accordingly 1', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(JSON.parse('{"blockNumber":123456,"primaryKey":0}')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(JSON.parse('{"blockNumber":123456,"primaryKey":0}')); const mainInstance = new MainRewired(); - mainInstance.exporter = exporterStub; + mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); sinon.spy(mainInstance, 'handleInitPosition'); @@ -82,11 +88,11 @@ describe('Main tests', () => { }); it('handleInitPosition changes the lastProcessedPosition accordingly 2', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(null); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(null); const mainInstance = new MainRewired(); - mainInstance.exporter = exporterStub; + mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); sinon.spy(mainInstance, 'handleInitPosition'); @@ -98,11 +104,11 @@ describe('Main tests', () => { }); it('handleInitPosition throws error when exporter.getLastPosition() fails', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(exporterStub); + sinon.stub(mainInstance, 'zookeeperState').value(zookeeperStub); sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { @@ -119,12 +125,12 @@ describe('Main tests', () => { }); it('handleInitPosition throws error when exporter.savePosition() fails', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(null); - exporterStub.savePosition.throws(new Error('Exporter savePosition failed')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(null); + zookeeperStub.savePosition.throws(new Error('Exporter savePosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(exporterStub); + sinon.stub(mainInstance, 'zookeeperState').value(zookeeperStub); sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { @@ -142,7 +148,7 @@ describe('Main tests', () => { it('initWorker throws error when worker is already present', async () => { const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.initWorker('eth', {}); assert.fail('initWorker should have thrown an error'); @@ -158,9 +164,8 @@ describe('Main tests', () => { it('initWorker throws an error when worker.init() fails', async () => { const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); - sinon.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); + sandbox.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); try { await mainInstance.initWorker('eth', ethConstants); @@ -177,10 +182,9 @@ describe('Main tests', () => { it('initWorker throws an error when handleInitPosition() fails', async () => { const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); - sinon.stub(ETHWorker.prototype, 'init').resolves(); + sandbox.stub(ETHWorker.prototype, 'init').resolves(); - sinon.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); + sandbox.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); try { await mainInstance.initWorker('eth', ethConstants); @@ -197,18 +201,17 @@ describe('Main tests', () => { it('initWorker success', async () => { const mainInstance = new MainRewired(); - mainInstance.exporter = new Exporter('test-exporter', true, 'topic-not-used'); - sinon.stub(ETHWorker.prototype, 'init').resolves(); - sinon.stub(mainInstance, 'handleInitPosition').resolves(); + sandbox.stub(ETHWorker.prototype, 'init').resolves(); + sandbox.stub(mainInstance, 'handleInitPosition').resolves(); await mainInstance.initWorker('eth', ethConstants); assert(mainInstance.handleInitPosition.calledOnce); }); - it('workLoop throws error when worker can\'t be initialised', async () => { - sinon.stub(BaseWorker.prototype, 'work').rejects(new Error('Error in worker "work" method')); + it('workLoop throws error when worker can not be initialised', async () => { + sandbox.stub(BaseWorker.prototype, 'work').rejects(new Error('Error in worker "work" method')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.workLoop(); assert.fail('workLoop should have thrown an error'); @@ -223,13 +226,13 @@ describe('Main tests', () => { }); it('workLoop throws error when storeEvents() fails', async () => { - sinon.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); - sinon.stub(Main.prototype, 'updateMetrics').returns(null); - sinon.stub(Exporter.prototype, 'storeEvents').rejects(new Error('storeEvents failed')); + sandbox.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); + sandbox.stub(Main.prototype, 'updateMetrics').returns(null); + sandbox.stub(KafkaStorage.prototype, 'storeEvents').rejects(new Error('storeEvents failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'kafkaStorage').value(new KafkaStorage('test-exporter', true, 'topic-not-used')); try { await mainInstance.workLoop(); assert.fail('workLoop should have thrown an error'); @@ -244,14 +247,15 @@ describe('Main tests', () => { }); it('workLoop throws error when savePosition() fails', async () => { - sinon.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); - sinon.stub(Main.prototype, 'updateMetrics').returns(null); - sinon.stub(Exporter.prototype, 'storeEvents').resolves(); - sinon.stub(Exporter.prototype, 'savePosition').rejects(new Error('savePosition failed')); + sandbox.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); + sandbox.stub(Main.prototype, 'updateMetrics').returns(null); + sandbox.stub(KafkaStorage.prototype, 'storeEvents').resolves(); + sandbox.stub(ZookeeperState.prototype, 'savePosition').rejects(new Error('savePosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'kafkaStorage').value(new KafkaStorage('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'zookeeperState').value(new ZookeeperState('test-exporter', 'topic-not-used')); try { await mainInstance.workLoop(); @@ -269,12 +273,18 @@ describe('Main tests', () => { describe('main function', () => { + let sandbox: any = null; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + afterEach(() => { - sinon.restore(); + sandbox.restore(); }); it('main function throws error when initialization fails', async () => { - sinon.stub(Main.prototype, 'init').rejects(new Error('Main init failed')); + sandbox.stub(Main.prototype, 'init').rejects(new Error('Main init failed')); try { await main(); @@ -290,8 +300,8 @@ describe('main function', () => { }); it('main function throws error when workLoop fails', async () => { - sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').rejects(new Error('Main workLoop failed')); + sandbox.stub(Main.prototype, 'init').resolves(); + sandbox.stub(Main.prototype, 'workLoop').rejects(new Error('Main workLoop failed')); try { await main(); @@ -307,9 +317,9 @@ describe('main function', () => { }); it('main function throws error when disconnecting fails', async () => { - sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').resolves(); - sinon.stub(Main.prototype, 'disconnect').rejects(new Error('Main disconnect failed')); + sandbox.stub(Main.prototype, 'init').resolves(); + sandbox.stub(Main.prototype, 'workLoop').resolves(); + sandbox.stub(Main.prototype, 'disconnect').rejects(new Error('Main disconnect failed')); try { await main(); @@ -325,9 +335,9 @@ describe('main function', () => { }); it('main function works', async () => { - const initStub = sinon.stub(Main.prototype, 'init').resolves(); - const workLoopStub = sinon.stub(Main.prototype, 'workLoop').resolves(); - const disconnectStub = sinon.stub(Main.prototype, 'disconnect').resolves(); + const initStub = sandbox.stub(Main.prototype, 'init').resolves(); + const workLoopStub = sandbox.stub(Main.prototype, 'workLoop').resolves(); + const disconnectStub = sandbox.stub(Main.prototype, 'disconnect').resolves(); await main(); From 23cc72035a0db3305f055f9a842ef3e4771f12a4 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Wed, 10 Jul 2024 13:49:14 +0300 Subject: [PATCH 2/8] Support multiple topics initial --- src/blockchains/erc20/erc20_worker.ts | 8 +++--- src/blockchains/utxo/utxo_worker.ts | 7 +++-- src/lib/worker_base.ts | 7 +++-- src/main.ts | 38 ++++++++++++++++++--------- 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/blockchains/erc20/erc20_worker.ts b/src/blockchains/erc20/erc20_worker.ts index 21f40e69..5928a9ab 100644 --- a/src/blockchains/erc20/erc20_worker.ts +++ b/src/blockchains/erc20/erc20_worker.ts @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker { this.allOldContracts = []; } - async init(exporter?: KafkaStorage) { + async init(storages: KafkaStorage[]) { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; if (this.settings.EXPORT_BLOCKS_LIST) { @@ -84,10 +84,10 @@ export class ERC20Worker extends BaseWorker { } if (this.settings.EVENTS_IN_SAME_PARTITION) { - if (exporter === undefined) { - throw Error('Exporter reference need to be provided for events in same partition') + if (!storages || storages.length != 1) { + throw Error('Single Kafka storage needs to be provided for events in same partition') } - await exporter.initPartitioner((event: any) => simpleHash(event.contract)); + await storages[0].initPartitioner((event: any) => simpleHash(event.contract)); } } diff --git a/src/blockchains/utxo/utxo_worker.ts b/src/blockchains/utxo/utxo_worker.ts index 1ec40de4..73823ce8 100644 --- a/src/blockchains/utxo/utxo_worker.ts +++ b/src/blockchains/utxo/utxo_worker.ts @@ -33,10 +33,13 @@ export class UTXOWorker extends BaseWorker { this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT); } - async init(exporter: KafkaStorage) { + async init(storages: KafkaStorage[]) { const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []); this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS; - await exporter.initPartitioner((event: any) => event['height']); + if (!storages || storages.length != 1) { + throw Error('Single Kafka storage needs to be provided for UTXO exporter') + } + await storages[0].initPartitioner((event: any) => event['height']); } async sendRequest(method: string, params: any) { diff --git a/src/lib/worker_base.ts b/src/lib/worker_base.ts index d547544a..0ab435bd 100644 --- a/src/lib/worker_base.ts +++ b/src/lib/worker_base.ts @@ -3,6 +3,9 @@ import { logger } from './logger'; import { KafkaStorage } from './kafka_storage'; import { ExporterPosition } from '../types' +export type WorkResult = any[] +export type WorkResultMultiMode = Map + export class BaseWorker { public lastExportTime: number; public lastConfirmedBlock: number; @@ -30,11 +33,11 @@ export class BaseWorker { * Upon returning from the method call the implementation should have updated all the member variables of the * base class. */ - work(): Promise> { + work(): Promise { throw new Error('"work" method need to be overriden'); } // To be implemented on inheritance. - async init(_exporter: KafkaStorage) { + async init(_storage: KafkaStorage | Map) { throw new Error('"init" method need to be overriden'); } diff --git a/src/main.ts b/src/main.ts index 16ee1601..eba84efc 100644 --- a/src/main.ts +++ b/src/main.ts @@ -11,12 +11,12 @@ import { EXPORT_TIMEOUT_MLS } from './lib/constants'; import { constructWorker } from './blockchains/construct_worker' import * as constantsBase from './lib/constants'; import { ExporterPosition } from './types' -import { BaseWorker } from './lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from './lib/worker_base'; export class Main { private worker!: BaseWorker; private shouldWork: boolean; - private kafkaStorage!: KafkaStorage; + private kafkaStorage!: KafkaStorage | Map; private zookeeperState!: ZookeeperState; private lastProcessedPosition!: ExporterPosition; private microServer: Server; @@ -29,17 +29,29 @@ export class Main { )) } - async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) { + async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string | Map) { const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: '; - this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); - this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); - await this.kafkaStorage - .connect() - .then(() => this.kafkaStorage.initTransactions()) - .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); - await this.zookeeperState - .connect() - .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); + if (typeof kafkaTopic === 'string') { + this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); + } + else if (kafkaTopic instanceof Map) { + this.kafkaStorage = new Map(Array.from(kafkaTopic, ([mode, topic]) => [mode, new KafkaStorage(exporterName, isTransactions, topic)])) + const kafkaTopicConcat = Array.from(kafkaTopic.keys()).join('-') + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopicConcat); + } else { + throw new Error(`kafkaTopic variable should be either string or Map. It is: ${kafkaTopic}`); + } + + + try { + const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] + await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions()))) + await this.zookeeperState.connect(); + } + catch (err: any) { + throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); + } } async handleInitPosition() { @@ -103,7 +115,7 @@ export class Main { async workLoop() { while (this.shouldWork) { this.worker.lastRequestStartTime = Date.now(); - const events = await this.worker.work(); + const workResult: WorkResult | WorkResultMultiMode = await this.worker.work(); this.worker.lastExportTime = Date.now(); From f88493e9eea8e19c109cfddb2c2a6b1fcc925f91 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Wed, 10 Jul 2024 17:03:30 +0300 Subject: [PATCH 3/8] Support multiple topics working --- src/blockchains/erc20/erc20_worker.ts | 10 +-- src/blockchains/utxo/utxo_worker.ts | 10 +-- src/index.ts | 7 +- src/main.ts | 113 ++++++++++++++++--------- src/test/erc20/worker.spec.ts | 9 +- src/test/index.test.ts | 114 +++++++++++++++++++++----- 6 files changed, 181 insertions(+), 82 deletions(-) diff --git a/src/blockchains/erc20/erc20_worker.ts b/src/blockchains/erc20/erc20_worker.ts index 5928a9ab..71ab5644 100644 --- a/src/blockchains/erc20/erc20_worker.ts +++ b/src/blockchains/erc20/erc20_worker.ts @@ -5,7 +5,7 @@ import { constructRPCClient } from '../../lib/http_client'; import { extendEventsWithPrimaryKey } from './lib/extend_events_key'; import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite'; import { stableSort, readJsonFile } from './lib/util'; -import { BaseWorker } from '../../lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; import { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator'; import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper'; import { TimestampsCache } from './lib/timestamps_cache'; @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker { this.allOldContracts = []; } - async init(storages: KafkaStorage[]) { + async init(storage: KafkaStorage | Map) { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; if (this.settings.EXPORT_BLOCKS_LIST) { @@ -84,10 +84,10 @@ export class ERC20Worker extends BaseWorker { } if (this.settings.EVENTS_IN_SAME_PARTITION) { - if (!storages || storages.length != 1) { + if (!(storage instanceof KafkaStorage)) { throw Error('Single Kafka storage needs to be provided for events in same partition') } - await storages[0].initPartitioner((event: any) => simpleHash(event.contract)); + await storage.initPartitioner((event: any) => simpleHash(event.contract)); } } @@ -112,7 +112,7 @@ export class ERC20Worker extends BaseWorker { }; } - async work() { + async work(): Promise { const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); if (workerContext === NO_WORK_SLEEP) return []; diff --git a/src/blockchains/utxo/utxo_worker.ts b/src/blockchains/utxo/utxo_worker.ts index 73823ce8..6d5793c9 100644 --- a/src/blockchains/utxo/utxo_worker.ts +++ b/src/blockchains/utxo/utxo_worker.ts @@ -1,7 +1,7 @@ 'use strict'; import { logger } from '../../lib/logger'; import { constructRPCClient } from '../../lib/http_client'; -import { BaseWorker } from '../../lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; import { KafkaStorage } from '../../lib/kafka_storage'; import { HTTPClientInterface } from '../../types'; @@ -33,13 +33,13 @@ export class UTXOWorker extends BaseWorker { this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT); } - async init(storages: KafkaStorage[]) { + async init(storage: KafkaStorage | Map) { const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []); this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS; - if (!storages || storages.length != 1) { + if (!(storage instanceof KafkaStorage)) { throw Error('Single Kafka storage needs to be provided for UTXO exporter') } - await storages[0].initPartitioner((event: any) => event['height']); + await storage.initPartitioner((event: any) => event['height']); } async sendRequest(method: string, params: any) { @@ -83,7 +83,7 @@ export class UTXOWorker extends BaseWorker { return await this.sendRequestWithRetry('getblock', [blockHash, 2]); } - async work() { + async work(): Promise { if (this.lastConfirmedBlock === this.lastExportedBlock) { this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; diff --git a/src/index.ts b/src/index.ts index 6f2d1910..4d218041 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ 'use strict'; import { logger } from './lib/logger'; -import { BLOCKCHAIN } from './lib/constants'; +import * as constantsBase from './lib/constants'; import { getBoolEnvVariable } from './lib/utils'; import { Main } from './main' @@ -11,10 +11,7 @@ export async function main() { mainInstance = new Main(); try { - if (BLOCKCHAIN === undefined) { - throw Error("'BLOCKCHAIN' variable need to be defined") - } - await mainInstance.init(BLOCKCHAIN); + await mainInstance.init(constantsBase); } catch (err: any) { logger.error(err.stack); throw new Error(`Error initializing exporter: ${err.message}`); diff --git a/src/main.ts b/src/main.ts index eba84efc..131971d2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -9,7 +9,6 @@ import { ZookeeperState } from './lib/zookeeper_state'; const EXPORTER_NAME = process.env.EXPORTER_NAME || 'san-chain-exporter'; import { EXPORT_TIMEOUT_MLS } from './lib/constants'; import { constructWorker } from './blockchains/construct_worker' -import * as constantsBase from './lib/constants'; import { ExporterPosition } from './types' import { BaseWorker, WorkResult, WorkResultMultiMode } from './lib/worker_base'; @@ -20,6 +19,7 @@ export class Main { private zookeeperState!: ZookeeperState; private lastProcessedPosition!: ExporterPosition; private microServer: Server; + private mergedConstants: any; constructor() { this.shouldWork = true; @@ -30,7 +30,6 @@ export class Main { } async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string | Map) { - const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: '; if (typeof kafkaTopic === 'string') { this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); @@ -44,14 +43,9 @@ export class Main { } - try { - const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] - await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions()))) - await this.zookeeperState.connect(); - } - catch (err: any) { - throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); - } + const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] + await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions()))) + await this.zookeeperState.connect(); } async handleInitPosition() { @@ -77,19 +71,22 @@ export class Main { return copy; } - async initWorker(blockchain: string, mergedConstants: any) { + private async initWorker() { this.#isWorkerSet(); - logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(mergedConstants))}`); - this.worker = constructWorker(blockchain, mergedConstants); + logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(this.mergedConstants))}`); + this.worker = constructWorker(this.mergedConstants.BLOCKCHAIN, this.mergedConstants); await this.worker.init(this.kafkaStorage); await this.handleInitPosition(); } - async init(blockchain: string) { - const blockchainSpecificConstants = require(`./blockchains/${blockchain}/lib/constants`); - const mergedConstants = { ...constantsBase, ...blockchainSpecificConstants }; - await this.initExporter(EXPORTER_NAME, true, mergedConstants.KAFKA_TOPIC); - await this.initWorker(blockchain, mergedConstants); + async init(constantsBase: any) { + if (constantsBase.BLOCKCHAIN === undefined) { + throw Error("'BLOCKCHAIN' variable need to be defined") + } + const blockchainSpecificConstants = require(`./blockchains/${constantsBase.BLOCKCHAIN}/lib/constants`); + this.mergedConstants = { ...constantsBase, ...blockchainSpecificConstants }; + await this.initExporter(EXPORTER_NAME, true, this.mergedConstants.KAFKA_TOPIC); + await this.initWorker(); metrics.startCollection(); this.microServer.on('error', (err) => { @@ -112,6 +109,34 @@ export class Main { metrics.lastExportedBlock.set(this.worker.lastExportedBlock); } + async writeDataToKafka(workResult: WorkResult | WorkResultMultiMode) { + if (Array.isArray(workResult)) { + if (!(this.kafkaStorage instanceof KafkaStorage)) { + throw new Error('Worker returns data for single Kafka storage and multiple are defined') + } + + if (workResult.length > 0) { + await this.kafkaStorage.storeEvents(workResult, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA); + } + } + else if (workResult instanceof Map) { + if (!(this.kafkaStorage instanceof Map)) { + throw new Error('Worker returns data for multiple Kafka storages and single is defined') + } + for (const [mode, data] of workResult.entries()) { + const kafkaStoragePerMode = this.kafkaStorage.get(mode) + if (!kafkaStoragePerMode) { + throw Error(`Workers returns data for mode ${mode} and no worker is defined for this mode`) + } + + await kafkaStoragePerMode.storeEvents(data, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA); + } + } + else { + throw new Error('Worker returns unexpected data type') + } + } + async workLoop() { while (this.shouldWork) { this.worker.lastRequestStartTime = Date.now(); @@ -122,9 +147,8 @@ export class Main { this.updateMetrics(); this.lastProcessedPosition = this.worker.getLastProcessedPosition(); - if (events && events.length > 0) { - await this.kafkaStorage.storeEvents(events, constantsBase.WRITE_SIGNAL_RECORDS_KAFKA); - } + await this.writeDataToKafka(workResult); + await this.zookeeperState.savePosition(this.lastProcessedPosition); logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); @@ -135,9 +159,12 @@ export class Main { } async disconnect() { - if (this.kafkaStorage) { + if (this.kafkaStorage instanceof KafkaStorage) { await this.kafkaStorage.disconnect(); } + else if (this.kafkaStorage instanceof Map) { + await Promise.all(Array.from(this.kafkaStorage.values()).map(storage => storage.disconnect())); + } if (this.zookeeperState) { await this.zookeeperState.disconnect(); } @@ -157,29 +184,32 @@ export class Main { } } - healthcheckKafka(): Promise { - if (this.kafkaStorage.isConnected()) { - return Promise.resolve(); - } else { - return Promise.reject('Kafka client is not connected to any brokers'); + healthcheckKafka(): boolean { + if (this.kafkaStorage instanceof KafkaStorage) { + return this.kafkaStorage.isConnected(); + } + else if (this.kafkaStorage instanceof Map) { + return Array.from(this.kafkaStorage.values()).every(storage => storage.isConnected()); + } + else { + return false; } } - healthcheckExportTimeout(): Promise { + healthcheckExportTimeout(): boolean { const timeFromLastExport = Date.now() - this.worker.lastExportTime; const isExportTimeoutExceeded = timeFromLastExport > EXPORT_TIMEOUT_MLS; if (isExportTimeoutExceeded) { - const errorMessage = `Time from the last export ${timeFromLastExport}ms exceeded limit ` + - `${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`; - return Promise.reject(errorMessage); + logger.warn(`Time from the last export ${timeFromLastExport}ms exceeded limit ` + + `${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`); + return false; } else { - return Promise.resolve(); + return true; } } - healthcheck(): Promise { - return this.healthcheckKafka() - .then(() => this.healthcheckExportTimeout()); + healthcheck(): boolean { + return this.healthcheckKafka() && this.healthcheckExportTimeout(); } } @@ -198,12 +228,13 @@ const microHandler = async (request: IncomingMessage, response: ServerResponse, switch (req.pathname) { case '/healthcheck': - return mainInstance.healthcheck() - .then(() => send(response, 200, 'ok')) - .catch((err: any) => { - logger.error(`Healthcheck failed: ${err.toString()}`); - send(response, 500, err.toString()); - }); + if (mainInstance.healthcheck()) { + return send(response, 200, 'ok'); + } + else { + logger.error('Healthcheck failed'); + return send(response, 500, "Healthcheck failed"); + } case '/metrics': response.setHeader('Content-Type', metrics.register.contentType); return send(response, 200, await metrics.register.metrics()); diff --git a/src/test/erc20/worker.spec.ts b/src/test/erc20/worker.spec.ts index d414e29a..06864fa7 100644 --- a/src/test/erc20/worker.spec.ts +++ b/src/test/erc20/worker.spec.ts @@ -7,6 +7,7 @@ import { ContractOverwrite } from '../../blockchains/erc20/lib/contract_overwrit import helpers from './helpers'; import { ERC20Transfer } from '../../blockchains/erc20/erc20_types'; import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper'; +import { KafkaStorage } from '../../lib/kafka_storage'; @@ -79,7 +80,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); worker.lastConfirmedBlock = 1; worker.lastExportedBlock = 0; @@ -98,7 +99,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { @@ -135,7 +136,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { @@ -174,7 +175,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent, originalEvent2]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { diff --git a/src/test/index.test.ts b/src/test/index.test.ts index a45e1e18..f1a760e4 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -1,6 +1,7 @@ const sinon = require('sinon'); const rewire = require('rewire'); import assert from 'assert'; +import { Server } from 'http' // For this test, presume we are creating the ETH worker process.env.BLOCKCHAIN = 'eth'; process.env.TEST_ENV = 'true'; @@ -11,15 +12,15 @@ import { BaseWorker } from '../lib/worker_base'; import { KafkaStorage } from '../lib/kafka_storage'; import { ZookeeperState } from '../lib/zookeeper_state'; import { ETHWorker } from '../blockchains/eth/eth_worker'; -import * as ethConstants from '../blockchains/eth/lib/constants'; import zkClientAsync from '../lib/zookeeper_client_async'; -const blockchain = 'eth'; + describe('Main tests', () => { const constants = { START_BLOCK: -1, - START_PRIMARY_KEY: -1 + START_PRIMARY_KEY: -1, + BLOCKCHAIN: 'eth' }; let sandbox: any = null; @@ -43,10 +44,14 @@ describe('Main tests', () => { sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); try { - await mainInstance.init(blockchain); + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } + await mainInstance.init(mergedConstants); } catch (err) { if (err instanceof Error) { - assert.strictEqual(err.message, 'Error when initializing exporter: Exporter connection failed'); + assert.strictEqual(err.message, 'Exporter connection failed'); } else { assert.fail('Exception is not an instance of Error') @@ -59,11 +64,15 @@ describe('Main tests', () => { sandbox.stub(KafkaStorage.prototype, 'initTransactions').rejects(new Error('Exporter initTransactions failed')); const mainInstance = new Main(); + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } try { - await mainInstance.init(blockchain); + await mainInstance.init(mergedConstants); } catch (err) { if (err instanceof Error) { - assert.strictEqual(err.message, 'Error when initializing exporter: Exporter initTransactions failed'); + assert.strictEqual(err.message, 'Exporter initTransactions failed'); } else { assert.fail('Exception is not an instance of Error') @@ -146,11 +155,19 @@ describe('Main tests', () => { } }); - it('initWorker throws error when worker is already present', async () => { + it('init throws error when worker is already present', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + const mainInstance = new Main(); sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } try { - await mainInstance.initWorker('eth', {}); + await mainInstance.init(mergedConstants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -162,13 +179,21 @@ describe('Main tests', () => { } }); - it('initWorker throws an error when worker.init() fails', async () => { + it('init throws an error when worker.init() fails', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + const mainInstance = new Main(); + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } sandbox.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); try { - await mainInstance.initWorker('eth', ethConstants); + await mainInstance.init(mergedConstants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -180,14 +205,21 @@ describe('Main tests', () => { } }); - it('initWorker throws an error when handleInitPosition() fails', async () => { + it('init throws an error when handleInitPosition() fails', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); const mainInstance = new Main(); sandbox.stub(ETHWorker.prototype, 'init').resolves(); sandbox.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } try { - await mainInstance.initWorker('eth', ethConstants); + await mainInstance.init(mergedConstants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -200,12 +232,25 @@ describe('Main tests', () => { }); it('initWorker success', async () => { - const mainInstance = new MainRewired(); + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + sandbox.stub(Server.prototype, 'on') + sandbox.stub(Server.prototype, 'listen') + const mainInstance = new Main(); + sandbox.stub(ETHWorker.prototype, 'init').resolves(); - sandbox.stub(mainInstance, 'handleInitPosition').resolves(); + const handleInitPositionStub = sandbox.stub(mainInstance, 'handleInitPosition') - await mainInstance.initWorker('eth', ethConstants); - assert(mainInstance.handleInitPosition.calledOnce); + + handleInitPositionStub.resolves(); + + const mergedConstants = { + KAFKA_TOPIC: 'NOT_USED', + ...constants + } + await mainInstance.init(mergedConstants); + assert(handleInitPositionStub.calledOnce); }); it('workLoop throws error when worker can not be initialised', async () => { @@ -226,13 +271,39 @@ describe('Main tests', () => { }); it('workLoop throws error when storeEvents() fails', async () => { + class MockWorker extends BaseWorker { + constructor() { + super({}); + } + + work() { + return Promise.resolve([{}]) + } + getLastProcessedPostion() { + return {} + } + } + + class MockKafkaStorage extends KafkaStorage { + constructor() { + super('not_used', false, 'not_used') + } + + async storeEvents() { + console.log("Store events mock") + throw new Error('storeEvents failed'); + } + } + sandbox.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); sandbox.stub(Main.prototype, 'updateMetrics').returns(null); sandbox.stub(KafkaStorage.prototype, 'storeEvents').rejects(new Error('storeEvents failed')); const mainInstance = new Main(); - sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - sandbox.stub(mainInstance, 'kafkaStorage').value(new KafkaStorage('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'worker').value(new MockWorker()); + sandbox.stub(mainInstance, 'kafkaStorage').value(new MockKafkaStorage()); + sandbox.stub(mainInstance, 'mergedConstants').value({ WRITE_SIGNAL_RECORDS_KAFKA: false }); + try { await mainInstance.workLoop(); assert.fail('workLoop should have thrown an error'); @@ -256,6 +327,7 @@ describe('Main tests', () => { sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); sandbox.stub(mainInstance, 'kafkaStorage').value(new KafkaStorage('test-exporter', true, 'topic-not-used')); sandbox.stub(mainInstance, 'zookeeperState').value(new ZookeeperState('test-exporter', 'topic-not-used')); + sandbox.stub(mainInstance, 'mergedConstants').value({ WRITE_SIGNAL_RECORDS_KAFKA: false }); try { await mainInstance.workLoop(); @@ -345,6 +417,4 @@ describe('main function', () => { sinon.assert.calledOnce(workLoopStub); sinon.assert.calledOnce(disconnectStub); }); -} - -); +}); From 9af0ebba23cb54f61e2b004c2224ca3d17f39d2b Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Mon, 22 Jul 2024 20:46:03 +0300 Subject: [PATCH 4/8] ETH native token and receipts unified in ETHWorker --- src/blockchains/construct_worker.ts | 3 - src/blockchains/eth/eth_worker.ts | 88 ++++++++++++++----- src/blockchains/eth/lib/constants.ts | 2 + src/blockchains/eth/lib/fees_decoder.ts | 14 +-- src/blockchains/eth/lib/fetch_data.ts | 13 +-- .../helper.js => eth/lib/helper_receipts.ts} | 47 +++++----- src/blockchains/receipts/receipts_worker.ts | 3 +- src/lib/worker_base.ts | 2 +- src/test/eth/fetch_events.spec.ts | 7 +- src/test/eth/worker.spec.ts | 33 ++++--- 10 files changed, 127 insertions(+), 85 deletions(-) rename src/blockchains/{receipts/lib/helper.js => eth/lib/helper_receipts.ts} (67%) diff --git a/src/blockchains/construct_worker.ts b/src/blockchains/construct_worker.ts index 3c266cf5..ad1062fe 100644 --- a/src/blockchains/construct_worker.ts +++ b/src/blockchains/construct_worker.ts @@ -6,7 +6,6 @@ import { ETHWorker } from './eth/eth_worker'; import { ETHBlocksWorker } from './eth_blocks/eth_blocks_worker'; import { ETHContractsWorker } from './eth_contracts/eth_contracts_worker'; import { MaticWorker } from './matic/matic_worker'; -import { ReceiptsWorker } from './receipts/receipts_worker'; import { UTXOWorker } from './utxo/utxo_worker'; import { XRPWorker } from './xrp/xrp_worker'; @@ -22,8 +21,6 @@ export function constructWorker(blockchain: string, settings: any): BaseWorker { return new ETHBlocksWorker(settings); case 'matic': return new MaticWorker(settings); - case 'receipts': - return new ReceiptsWorker(settings); case 'utxo': return new UTXOWorker(settings); case 'xrp': diff --git a/src/blockchains/eth/eth_worker.ts b/src/blockchains/eth/eth_worker.ts index 8fdda996..0b604c9b 100644 --- a/src/blockchains/eth/eth_worker.ts +++ b/src/blockchains/eth/eth_worker.ts @@ -3,7 +3,7 @@ import { constructRPCClient } from '../../lib/http_client'; import { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } from './lib/dao_hack'; import { getGenesisTransfers } from './lib/genesis_transfers'; import { transactionOrder, stableSort } from './lib/util'; -import { BaseWorker } from '../../lib/worker_base'; +import { BaseWorker, WorkResultMultiMode } from '../../lib/worker_base'; import { Web3Interface, constructWeb3Wrapper, safeCastToNumber } from './lib/web3_wrapper'; import { decodeTransferTrace } from './lib/decode_transfers'; import { FeesDecoder } from './lib/fees_decoder'; @@ -11,15 +11,17 @@ import { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WO import { WithdrawalsDecoder } from './lib/withdrawals_decoder'; import { fetchEthInternalTrx, fetchBlocks, fetchReceipts } from './lib/fetch_data'; import { HTTPClientInterface } from '../../types'; -import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from './eth_types'; +import { Trace, ETHBlock, ETHTransfer, ETHReceipt } from './eth_types'; import { EOB, collectEndOfBlocks } from './lib/end_of_block'; - +import { assertIsDefined } from '../../lib/utils'; +import { decodeReceipt } from './lib/helper_receipts' export class ETHWorker extends BaseWorker { private web3Wrapper: Web3Interface; private ethClient: HTTPClientInterface; private feesDecoder: FeesDecoder; private withdrawalsDecoder: WithdrawalsDecoder; + private modes: string[]; constructor(settings: any) { super(settings); @@ -31,19 +33,19 @@ export class ETHWorker extends BaseWorker { this.feesDecoder = new FeesDecoder(this.web3Wrapper); this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3Wrapper); + this.modes = []; } - async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map, ETHReceiptsMap]> { - return await Promise.all([ - fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock), - fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true), - fetchReceipts(this.ethClient, this.web3Wrapper, - this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock), - ]); + async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map, ETHReceipt[]]> { + const traces: Promise = this.isTracesNeeded() ? fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock) : Promise.resolve([]); + const blocks: Promise> = fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true); + const receipts: Promise = this.isReceiptsNeeded() ? fetchReceipts(this.ethClient, this.web3Wrapper, + this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock) : Promise.resolve([]); + return await Promise.all([traces, blocks, receipts]); } transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], - blocks: any, receipts: ETHReceiptsMap): ETHTransfer[] { + blocks: any, receipts: ETHReceipt[]): ETHTransfer[] { let events: ETHTransfer[] = []; if (fromBlock === 0) { logger.info('Adding the GENESIS transfers'); @@ -78,7 +80,7 @@ export class ETHWorker extends BaseWorker { return result; } - transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceiptsMap): ETHTransfer[] { + transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceipt[]): ETHTransfer[] { const result: ETHTransfer[] = []; for (const block of blocks) { @@ -95,31 +97,71 @@ export class ETHWorker extends BaseWorker { return result; } - async work(): Promise<(ETHTransfer | EOB)[]> { + isReceiptsNeeded(): boolean { + return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) || this.modes.includes(this.settings.RECEIPTS_MODE) + } + + isTracesNeeded(): boolean { + return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) + } + + + async work(): Promise { + const result: WorkResultMultiMode = {}; const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); - if (workerContext === NO_WORK_SLEEP) return []; + if (workerContext === NO_WORK_SLEEP) return result; const { fromBlock, toBlock } = nextIntervalCalculator(this); + logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); - const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); - let events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); - events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper)) - if (events.length > 0) { - stableSort(events, transactionOrder); - extendEventsWithPrimaryKey(events, this.lastPrimaryKey); - this.lastPrimaryKey += events.length; - } + const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); + this.lastExportedBlock = toBlock; - return events; + assertIsDefined(blocks, "Blocks are needed for extraction"); + if (this.modes.includes(this.settings.NATIVE_TOKEN_MODE)) { + assertIsDefined(traces, "Traces are needed for native token transfers"); + assertIsDefined(receipts, "Receipts are needed for native token transfers"); + + const events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); + + events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper)) + if (events.length > 0) { + stableSort(events, transactionOrder); + extendEventsWithPrimaryKey(events, this.lastPrimaryKey); + + this.lastPrimaryKey += events.length; + } + result[this.settings.NATIVE_TOKEN_MODE] = events; + } + if (this.modes.includes(this.settings.RECEIPTS_MODE)) { + assertIsDefined(receipts, "Receipts are needed for receipts extraction"); + assertIsDefined(blocks, "Blocks are needed for extraction"); + const decodedReceipts = receipts.map((receipt: any) => decodeReceipt(receipt, this.web3Wrapper)); + decodedReceipts.forEach(receipt => { + const block = blocks.get(receipt.blockNumber) + assertIsDefined(block, `Block ${receipt.blockNumber} is missing`) + receipt['timestamp'] = block.timestamp; + }); + result[this.settings.RECEIPTS_MODE] = decodedReceipts; + } + + return result; } async init(): Promise { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; + + if (typeof this.settings.KAFKA_TOPIC === 'string') { + this.modes = [this.settings.NATIVE_TOKEN_MODE]; + } + else if (typeof this.settings.KAFKA_TOPIC === 'object') { + this.modes = Object.keys(this.settings.KAFKA_TOPIC); + } } } diff --git a/src/blockchains/eth/lib/constants.ts b/src/blockchains/eth/lib/constants.ts index 4f692688..1205bcea 100644 --- a/src/blockchains/eth/lib/constants.ts +++ b/src/blockchains/eth/lib/constants.ts @@ -10,5 +10,7 @@ export const BLOCK_INTERVAL = getIntEnvVariable('BLOCK_INTERVAL', 100); export const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts'; export const NODE_URL = process.env.NODE_URL || 'http://localhost:8545/'; export const LOOP_INTERVAL_CURRENT_MODE_SEC = getIntEnvVariable('LOOP_INTERVAL_CURRENT_MODE_SEC', 30); +export const NATIVE_TOKEN_MODE = 'native_token_transfers'; +export const RECEIPTS_MODE = 'receipts'; diff --git a/src/blockchains/eth/lib/fees_decoder.ts b/src/blockchains/eth/lib/fees_decoder.ts index 05dc1885..256d28b0 100644 --- a/src/blockchains/eth/lib/fees_decoder.ts +++ b/src/blockchains/eth/lib/fees_decoder.ts @@ -1,6 +1,6 @@ import assert from 'assert' import { Web3Interface, safeCastToNumber } from './web3_wrapper'; -import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceiptsMap } from '../eth_types'; +import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceipt, ETHReceiptsMap } from '../eth_types'; import { BURN_ADDRESS, LONDON_FORK_BLOCK } from './constants'; @@ -15,7 +15,7 @@ export class FeesDecoder { this.web3Wrapper = web3Wrapper; } - getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: any): ETHTransfer[] { + getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: ETHReceiptsMap): ETHTransfer[] { const gasExpense = BigInt(this.web3Wrapper.parseHexToNumber(transaction.gasPrice)) * BigInt(this.web3Wrapper.parseHexToNumber(receipts[transaction.hash].gasUsed)); return [{ @@ -94,15 +94,19 @@ export class FeesDecoder { return result; } - getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceiptsMap, isETH: boolean): ETHTransfer[] { + getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceipt[], isETH: boolean): ETHTransfer[] { const result: ETHTransfer[] = []; + const receiptsMap: ETHReceiptsMap = {}; + receipts.forEach((receipt: ETHReceipt) => { + receiptsMap[receipt.transactionHash] = receipt; + }); block.transactions.forEach((transaction: ETHTransaction | string) => { assert(isETHTransaction(transaction), "To get fees, ETH transaction should be expanded and not just the hash."); const feeTransfers: ETHTransfer[] = isETH && blockNumber >= LONDON_FORK_BLOCK ? - this.getPostLondonForkFees(transaction, block, receipts) : - this.getPreLondonForkFees(transaction, block, receipts); + this.getPostLondonForkFees(transaction, block, receiptsMap) : + this.getPreLondonForkFees(transaction, block, receiptsMap); result.push(...feeTransfers); }); diff --git a/src/blockchains/eth/lib/fetch_data.ts b/src/blockchains/eth/lib/fetch_data.ts index d602a17f..a5be0707 100644 --- a/src/blockchains/eth/lib/fetch_data.ts +++ b/src/blockchains/eth/lib/fetch_data.ts @@ -1,6 +1,6 @@ import { filterErrors } from './filter_errors'; import { Web3Interface } from './web3_wrapper'; -import { Trace, ETHBlock, ETHReceiptsMap, ETHReceipt } from '../eth_types'; +import { Trace, ETHBlock, ETHReceipt } from '../eth_types'; import { HTTPClientInterface } from '../../../types' @@ -43,7 +43,7 @@ export async function fetchBlocks(ethClient: HTTPClientInterface, } export async function fetchReceipts(ethClient: HTTPClientInterface, - web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise { + web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise { const batch: any[] = []; for (let currBlock = fromBlock; currBlock <= toBlock; currBlock++) { batch.push( @@ -54,19 +54,14 @@ export async function fetchReceipts(ethClient: HTTPClientInterface, ); } const finishedRequests = await ethClient.requestBulk(batch); - const result: ETHReceiptsMap = {}; - finishedRequests.forEach((response: any) => { + return finishedRequests.map((response: any) => { if (response.result) { - response.result.forEach((receipt: ETHReceipt) => { - result[receipt.transactionHash] = receipt; - }); + response.result; } else { throw new Error(JSON.stringify(response)); } }); - - return result; } diff --git a/src/blockchains/receipts/lib/helper.js b/src/blockchains/eth/lib/helper_receipts.ts similarity index 67% rename from src/blockchains/receipts/lib/helper.js rename to src/blockchains/eth/lib/helper_receipts.ts index a0e255e2..547ea6b5 100644 --- a/src/blockchains/receipts/lib/helper.js +++ b/src/blockchains/eth/lib/helper_receipts.ts @@ -1,9 +1,13 @@ +import { ETHReceipt } from "../eth_types"; + const lang = require('lodash/lang'); const array = require('lodash/array'); const object = require('lodash/object'); const collection = require('lodash/collection'); -const parseReceipts = (responses) => { +import { Web3Interface } from './web3_wrapper'; + +/*const parseReceipts = (responses) => { const receipts = responses.map((response) => response['result']); return array.compact(array.flatten(receipts)); }; @@ -16,13 +20,13 @@ const parseTransactionReceipts = (responses) => { const receipts = responses.map((response) => response['result']); return receipts; }; - -const decodeLog = (log, web3Wrapper) => { +*/ +const decodeLog = (log: any, web3Wrapper: Web3Interface) => { collection.forEach(['blockNumber', 'blockHash', 'transactionHash', 'transactionIndex'], - key => object.unset(log, key)); + (key: string) => object.unset(log, key)); collection.forEach(['logIndex', 'transactionLogIndex'], - key => { + (key: string) => { if (Object.prototype.hasOwnProperty.call(log, key) && log[key] !== undefined) { log[key] = web3Wrapper.parseHexToNumber(log[key]); } @@ -35,22 +39,22 @@ const decodeLog = (log, web3Wrapper) => { return log; }; -const columnizeLogs = (logs, web3Wrapper) => { +const columnizeLogs = (logs: any[], web3Wrapper: Web3Interface) => { if (logs.length === 0) { return []; } - const decodedLogs = collection.map(logs, log => decodeLog(log, web3Wrapper)); + const decodedLogs = collection.map(logs, (log: any) => decodeLog(log, web3Wrapper)); const logKeys = object.keys(decodedLogs[0]); - const result = {}; - collection.forEach(logKeys, key => result[`logs.${key}`] = decodedLogs.map(log => log[key])); + const result: any = {}; + collection.forEach(logKeys, (key: string) => result[`logs.${key}`] = decodedLogs.map((log: any) => log[key])); return result; }; -const decodeReceipt = (receipt, web3Wrapper) => { +export function decodeReceipt(receipt: ETHReceipt, web3Wrapper: Web3Interface) { const clonedReceipt = lang.cloneDeep(receipt); collection.forEach(['blockNumber', 'status', 'transactionIndex'], - key => { + (key: string) => { if (Object.prototype.hasOwnProperty.call(clonedReceipt, key) && clonedReceipt[key] !== undefined) { clonedReceipt[key] = web3Wrapper.parseHexToNumber(clonedReceipt[key]); } @@ -61,7 +65,7 @@ const decodeReceipt = (receipt, web3Wrapper) => { ); collection.forEach(['cumulativeGasUsed', 'gasUsed'], - key => clonedReceipt[key] = web3Wrapper.parseHexToNumberString(clonedReceipt[key]) + (key: string) => clonedReceipt[key] = web3Wrapper.parseHexToNumberString(clonedReceipt[key]) ); object.merge(clonedReceipt, columnizeLogs(clonedReceipt['logs'], web3Wrapper)); @@ -70,7 +74,7 @@ const decodeReceipt = (receipt, web3Wrapper) => { return clonedReceipt; }; -const decodeBlock = (block, web3Wrapper) => { +/*const decodeBlock = (block, web3Wrapper) => { return { timestamp: web3Wrapper.parseHexToNumber(block.timestamp), number: web3Wrapper.parseHexToNumber(block.number) @@ -82,19 +86,8 @@ const prepareBlockTimestampsObject = (blocks) => { for (const block of blocks) { obj[block.number] = block.timestamp; } return obj; -}; +};*/ + + -const setReceiptsTimestamp = async (receipts, timestamps) => { - return collection.forEach(receipts, receipt => receipt['timestamp'] = timestamps[receipt.blockNumber]); -}; -module.exports = { - parseReceipts, - parseBlocks, - parseTransactionReceipts, - decodeLog, - decodeReceipt, - decodeBlock, - prepareBlockTimestampsObject, - setReceiptsTimestamp -}; diff --git a/src/blockchains/receipts/receipts_worker.ts b/src/blockchains/receipts/receipts_worker.ts index f21dca8f..1d523d1b 100644 --- a/src/blockchains/receipts/receipts_worker.ts +++ b/src/blockchains/receipts/receipts_worker.ts @@ -1,5 +1,5 @@ 'use strict'; -import helper from './lib/helper'; +/*import helper from './lib/helper'; import { logger } from '../../lib/logger'; import { constructRPCClient } from '../../lib/http_client'; import { BaseWorker } from '../../lib/worker_base'; @@ -105,3 +105,4 @@ export class ReceiptsWorker extends BaseWorker { } +*/ \ No newline at end of file diff --git a/src/lib/worker_base.ts b/src/lib/worker_base.ts index 0ab435bd..2e684bed 100644 --- a/src/lib/worker_base.ts +++ b/src/lib/worker_base.ts @@ -4,7 +4,7 @@ import { KafkaStorage } from './kafka_storage'; import { ExporterPosition } from '../types' export type WorkResult = any[] -export type WorkResultMultiMode = Map +export type WorkResultMultiMode = { [key: string]: WorkResult; } export class BaseWorker { public lastExportTime: number; diff --git a/src/test/eth/fetch_events.spec.ts b/src/test/eth/fetch_events.spec.ts index cf78082a..781db6a5 100644 --- a/src/test/eth/fetch_events.spec.ts +++ b/src/test/eth/fetch_events.spec.ts @@ -3,7 +3,7 @@ import { ETHWorker } from '../../blockchains/eth/eth_worker'; import * as constants from '../../blockchains/eth/lib/constants'; import { injectDAOHackTransfers, DAO_HACK_ADDRESSES, DAO_HACK_FORK_BLOCK } from '../../blockchains/eth/lib/dao_hack'; import { Web3Interface, constructWeb3WrapperNoCredentials } from '../../blockchains/eth/lib/web3_wrapper'; -import { ETHBlock, ETHReceiptsMap, ETHTransfer } from '../../blockchains/eth/eth_types'; +import { ETHBlock, ETHReceipt, ETHTransfer } from '../../blockchains/eth/eth_types'; describe('fetch past events', function () { const transaction = { @@ -45,8 +45,7 @@ describe('fetch past events', function () { transactions: [transaction], }); - const receipts: ETHReceiptsMap = { - '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d': + const receipts: ETHReceipt[] = [ { 'blockHash': '0x22854625d4c18b3034461851a6fb181209e77a242adbd923989e7113a60fec56', 'blockNumber': '0x572559', @@ -80,7 +79,7 @@ describe('fetch past events', function () { 'transactionHash': '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', 'transactionIndex': '0x0' } - }; + ]; const trace = { 'action': { 'callType': 'call', diff --git a/src/test/eth/worker.spec.ts b/src/test/eth/worker.spec.ts index e47cb1c2..ad70f904 100644 --- a/src/test/eth/worker.spec.ts +++ b/src/test/eth/worker.spec.ts @@ -1,22 +1,28 @@ import assert from 'assert'; +const sinon = require('sinon'); import v8 from 'v8'; import { extendEventsWithPrimaryKey, ETHWorker } from '../../blockchains/eth/eth_worker'; import { EOB } from '../../blockchains/eth/lib/end_of_block'; import * as constants from '../../blockchains/eth/lib/constants'; import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from '../../blockchains/eth/eth_types'; +import { WorkResultMultiMode } from '../../lib/worker_base'; import { expect } from 'earl' +import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper'; + describe('Test worker', function () { let feeResult: ETHTransfer; let callResult: ETHTransfer; let endOfBlock: EOB; let eobWithPrimaryKey: EOB & { primaryKey: number }; + // This will construct the worker in the 'native token' mode + (constants as any).KAFKA_TOPIC = { [constants.NATIVE_TOKEN_MODE]: 'topic_name_not_used' } let worker = new ETHWorker(constants); let blockInfos = new Map() let feeResultWithPrimaryKey: ETHTransfer; let callResultWithPrimaryKey: ETHTransfer; - beforeEach(function () { + beforeEach(async function () { feeResult = { from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', to: '0x829bd824b016326a401d083b33d092293333a830', @@ -52,20 +58,23 @@ describe('Test worker', function () { blockInfos.set(5711191, ethBlockEvent(5711191)) blockInfos.set(5711192, ethBlockEvent(5711192)) blockInfos.set(5711193, ethBlockEvent(5711193)) + + sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) + + await worker.init(); }); - it('test primary key assignment', async function () { + it('test primary key assignment 1', async function () { let events = [feeResult, callResult] extendEventsWithPrimaryKey(events, 0) - expect(events).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey]); // Overwrite variables and methods that the 'work' method would use internally. worker.lastConfirmedBlock = 5711193; worker.lastExportedBlock = 5711192; worker.fetchData = async function (from: number, to: number) { - return Promise.resolve([[], blockInfos, {}]); + return Promise.resolve([[], blockInfos, []]); }; worker.transformPastEvents = function () { return [feeResult, callResult]; @@ -73,31 +82,31 @@ describe('Test worker', function () { const result = await worker.work(); - expect(result).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey, eobWithPrimaryKey]); + expect(result[constants.NATIVE_TOKEN_MODE]).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey, eobWithPrimaryKey]); }); - + it('test end of block events', async function () { worker.lastConfirmedBlock = 5711193; worker.lastExportedBlock = 5711190; + worker.fetchData = async function () { - return Promise.resolve([[], blockInfos, {}]); + return Promise.resolve([[], blockInfos, []]); }; worker.transformPastEvents = function () { return [feeResult, callResult]; }; const result = await worker.work(); - // input event is for block 5711193 // last exported block 5711190 // so there should be 3 EOB - const blocks = result.map((value) => value.blockNumber); - const types = result.map((value) => value.type); + const blocks = result[constants.NATIVE_TOKEN_MODE].map((value: ETHTransfer) => value.blockNumber); + const types = result[constants.NATIVE_TOKEN_MODE].map((value: ETHTransfer) => value.type); expect(blocks).toEqual([5711191, 5711192, 5711193, 5711193, 5711193]); expect(types).toEqual(["EOB", "EOB", "fee", "call", "EOB"]); }) - it('test primary key assignment', async function () { + it('test primary key assignment 2', async function () { const events = [feeResult, callResult] extendEventsWithPrimaryKey(events, 0) @@ -117,7 +126,7 @@ function ethBlockEvent(blockNumber: number): ETHBlock { totalDifficulty: "3", difficulty: "2", size: '2', - transactions: [] + transactions: [] } satisfies ETHBlock } From 6263bef6584160a589592075d3b88cd7fb1810d0 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Tue, 23 Jul 2024 14:59:26 +0300 Subject: [PATCH 5/8] Construct KafkaStorages correctly. Test fixes. --- src/blockchains/eth/lib/helper_receipts.ts | 1 - src/main.ts | 14 ++-- src/test/eth/fees_decoder.spec.ts | 17 ++--- src/test/index.test.ts | 23 +++--- src/test/receipts/helper.spec.ts | 81 +--------------------- 5 files changed, 28 insertions(+), 108 deletions(-) diff --git a/src/blockchains/eth/lib/helper_receipts.ts b/src/blockchains/eth/lib/helper_receipts.ts index 547ea6b5..e425c0b9 100644 --- a/src/blockchains/eth/lib/helper_receipts.ts +++ b/src/blockchains/eth/lib/helper_receipts.ts @@ -1,7 +1,6 @@ import { ETHReceipt } from "../eth_types"; const lang = require('lodash/lang'); -const array = require('lodash/array'); const object = require('lodash/object'); const collection = require('lodash/collection'); diff --git a/src/main.ts b/src/main.ts index 131971d2..afcfd255 100644 --- a/src/main.ts +++ b/src/main.ts @@ -34,16 +34,22 @@ export class Main { this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); } - else if (kafkaTopic instanceof Map) { - this.kafkaStorage = new Map(Array.from(kafkaTopic, ([mode, topic]) => [mode, new KafkaStorage(exporterName, isTransactions, topic)])) - const kafkaTopicConcat = Array.from(kafkaTopic.keys()).join('-') + else if (typeof kafkaTopic === 'object') { + this.kafkaStorage = Object.entries(kafkaTopic).reduce((acc, [key, value]) => { + acc.set(key, new KafkaStorage(exporterName, isTransactions, value)); + return acc; + }, new Map()); + const kafkaTopicConcat = Array.from(Object.keys(kafkaTopic)).join('-') this.zookeeperState = new ZookeeperState(exporterName, kafkaTopicConcat); } else { - throw new Error(`kafkaTopic variable should be either string or Map. It is: ${kafkaTopic}`); + throw new Error(`kafkaTopic variable should be either string or an object. It is: ${kafkaTopic}`); } const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] + if (kafkaStoragesArray.length === 0) { + throw new Error("At least one KafkaStorage needs to be constructed") + } await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions()))) await this.zookeeperState.connect(); } diff --git a/src/test/eth/fees_decoder.spec.ts b/src/test/eth/fees_decoder.spec.ts index 4e996017..047c32bc 100644 --- a/src/test/eth/fees_decoder.spec.ts +++ b/src/test/eth/fees_decoder.spec.ts @@ -156,15 +156,6 @@ const receipts_json_pre_london: ETHReceipt[] = [{ }]; -function turnReceiptsToMap(receipts: any[]) { - const result: any = {}; - receipts.forEach((receipt: any) => { - result[receipt.transactionHash] = receipt; - }); - - return result; -} - describe('Fees decoder test', function () { const web3Wrapper: Web3Interface = constructWeb3WrapperNoCredentials(constants.NODE_URL); const feesDecoder = new FeesDecoder(web3Wrapper); @@ -172,7 +163,7 @@ describe('Fees decoder test', function () { it('test fees post London zero priority', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_zero_priority, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_zero_priority.number)), - turnReceiptsToMap(receipts_json_post_london_no_priority), true); + receipts_json_post_london_no_priority, true); const expected = [{ from: '0xea674fdde714fd979de3edf0f56aa9716b898ec8', @@ -192,7 +183,7 @@ describe('Fees decoder test', function () { it('test fees post London with priority', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_with_priority, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_with_priority.number)), - turnReceiptsToMap(receipts_json_post_london_with_priority), true); + receipts_json_post_london_with_priority, true); const expected = [{ blockNumber: 13447057, @@ -222,7 +213,7 @@ describe('Fees decoder test', function () { it('test old type fees post London', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_old_tx_type, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_old_tx_type.number)), - turnReceiptsToMap(receipts_json_post_london_old_tx_type), true); + receipts_json_post_london_old_tx_type, true); const expected = [{ blockNumber: 13318440, @@ -252,7 +243,7 @@ describe('Fees decoder test', function () { it('test fees pre London', async function () { const preLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_pre_london, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_pre_london.number)), - turnReceiptsToMap(receipts_json_pre_london), true); + receipts_json_pre_london, true); const expected = [{ from: '0x39fa8c5f2793459d6622857e7d9fbb4bd91766d3', diff --git a/src/test/index.test.ts b/src/test/index.test.ts index f1a760e4..bb96b65c 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -34,9 +34,7 @@ describe('Main tests', () => { }); it('initExporter returns error when Exporter connect() fails', async () => { - sinon - .stub(zkClientAsync.prototype, 'connectAsync') - .rejects(new Error('Exporter connection failed')); + sandbox.stub(zkClientAsync.prototype, 'connectAsync').rejects(new Error('Exporter connection failed')); const mainInstance = new Main(); @@ -61,6 +59,7 @@ describe('Main tests', () => { it('initExporter returns error when Exporter initTransactions() fails', async () => { sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); sandbox.stub(KafkaStorage.prototype, 'initTransactions').rejects(new Error('Exporter initTransactions failed')); const mainInstance = new Main(); @@ -88,7 +87,7 @@ describe('Main tests', () => { mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); - sinon.spy(mainInstance, 'handleInitPosition'); + sandbox.spy(mainInstance, 'handleInitPosition'); await mainInstance.handleInitPosition(); assert(mainInstance.handleInitPosition.calledOnce); @@ -104,7 +103,7 @@ describe('Main tests', () => { mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); - sinon.spy(mainInstance, 'handleInitPosition'); + sandbox.spy(mainInstance, 'handleInitPosition'); await mainInstance.handleInitPosition(); assert(mainInstance.handleInitPosition.calledOnce); @@ -117,8 +116,8 @@ describe('Main tests', () => { zookeeperStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'zookeeperState').value(zookeeperStub); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'zookeeperState').value(zookeeperStub); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.handleInitPosition(); @@ -139,8 +138,8 @@ describe('Main tests', () => { zookeeperStub.savePosition.throws(new Error('Exporter savePosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'zookeeperState').value(zookeeperStub); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'zookeeperState').value(zookeeperStub); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.handleInitPosition(); @@ -413,8 +412,8 @@ describe('main function', () => { await main(); - sinon.assert.calledOnce(initStub); - sinon.assert.calledOnce(workLoopStub); - sinon.assert.calledOnce(disconnectStub); + sandbox.assert.calledOnce(initStub); + sandbox.assert.calledOnce(workLoopStub); + sandbox.assert.calledOnce(disconnectStub); }); }); diff --git a/src/test/receipts/helper.spec.ts b/src/test/receipts/helper.spec.ts index 86709667..9f0a93d6 100644 --- a/src/test/receipts/helper.spec.ts +++ b/src/test/receipts/helper.spec.ts @@ -1,77 +1,10 @@ import assert from 'assert'; -import helper from '../../blockchains/receipts/lib/helper'; +import { decodeReceipt } from '../../blockchains/eth/lib/helper_receipts'; import { Web3Interface, constructWeb3WrapperNoCredentials } from '../../blockchains/eth/lib/web3_wrapper'; import { NODE_URL } from '../../blockchains/eth/lib/constants'; const web3Wrapper: Web3Interface = constructWeb3WrapperNoCredentials(NODE_URL); -describe('blocks parsing', () => { - it('parses blocks', () => { - const responses = [ - { - jsonrpc: '2.0', - result: { timestamp: '0x56c097f1', number: '0xf53d5' } - }, - { - jsonrpc: '2.0', - result: { timestamp: '0x56c097f4', number: '0xf5408' } - } - ]; - - const result = helper.parseBlocks(responses); - assert.deepStrictEqual(result, [ - { timestamp: '0x56c097f1', number: '0xf53d5' }, - { timestamp: '0x56c097f4', number: '0xf5408' } - ]); - }); -}); - -describe('receipt parsing', () => { - it('parses receipts', () => { - const responses = [ - { - jsonrpc: '2.0', - result: [ - { - blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', - blockNumber: '0xf4fbb', - contractAddress: null, - cumulativeGasUsed: '0x5208', - from: '0x2a65aca4d5fc5b5c859090a6c34d164135398226', - gasUsed: '0x5208', - logs: [], - logsBloom: '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', - root: '0x1806fd9f2ef8bf8dce03665b4c80a1740efe4194f90864c662e7af6a80a02a08', - to: '0xe33977e292ccef99ea8828733e97562f3690a8ad', - transactionHash: '0x88217032c83348c7aae522090d7a5b932609860a5f6760e98e9048f6ddc55ad8', - transactionIndex: '0x0' - } - ] - } - ]; - - const result = helper.parseReceipts(responses); - assert.deepStrictEqual(result, - [ - { - blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', - blockNumber: '0xf4fbb', - contractAddress: null, - cumulativeGasUsed: '0x5208', - from: '0x2a65aca4d5fc5b5c859090a6c34d164135398226', - gasUsed: '0x5208', - logs: [], - logsBloom: '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', - root: '0x1806fd9f2ef8bf8dce03665b4c80a1740efe4194f90864c662e7af6a80a02a08', - to: '0xe33977e292ccef99ea8828733e97562f3690a8ad', - transactionHash: '0x88217032c83348c7aae522090d7a5b932609860a5f6760e98e9048f6ddc55ad8', - transactionIndex: '0x0' - } - ] - ); - }); -}); - context('receipt without logs', () => { const receipt = { @@ -91,7 +24,7 @@ context('receipt without logs', () => { describe('receipt decoding', () => { it('converts blockNumber from hex to number', () => { - const result = helper.decodeReceipt(receipt, web3Wrapper); + const result = decodeReceipt(receipt, web3Wrapper); const expected = { blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', @@ -171,19 +104,11 @@ context('receipt with logs', () => { transactionIndex: 4 }; - const result = helper.decodeReceipt(receipt, web3Wrapper); + const result = decodeReceipt(receipt, web3Wrapper); assert.deepStrictEqual(result, expected); }); }); }); -describe('setting reciept\'s timestamp', () => { - it('sets receipt\'s timestamp', async () => { - const receipt = { blockNumber: 1004250 }; - const timestamps = { '1004250': 1455576747 }; - const result = await helper.setReceiptsTimestamp([receipt], timestamps); - assert.deepStrictEqual(result, [{ blockNumber: 1004250, timestamp: 1455576747 }]); - }); -}); From d34e0b39a17a52353446caf3eddfd608bfd40480 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Tue, 23 Jul 2024 18:27:59 +0300 Subject: [PATCH 6/8] Fetch receipts fixes --- bin/export_env_vars.sh | 5 +++-- src/blockchains/eth/eth_worker.ts | 21 +++++++++++------- src/blockchains/eth/lib/fetch_data.ts | 14 ++++++++---- src/main.ts | 31 ++++++++++++++++----------- src/test/index.test.ts | 1 - 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/bin/export_env_vars.sh b/bin/export_env_vars.sh index 1624a4f3..c6aea074 100755 --- a/bin/export_env_vars.sh +++ b/bin/export_env_vars.sh @@ -2,11 +2,12 @@ export KAFKA_URL=kafka-hz.stage.san:30911 export ZOOKEEPER_URL=zookeeper-hz.stage.san:30921 export NODE_URL=https://ethereum.santiment.net export START_BLOCK="15676731" -export BLOCK_INTERVAL="50" +export BLOCK_INTERVAL="5" export EXPORT_TIMEOUT_MLS=300000 export CONTRACT_MODE="extract_exact_overwrite" export BLOCKCHAIN="eth" -export KAFKA_TOPIC="erc20_exporter_test_topic" +#export KAFKA_TOPIC="erc20_exporter_test_topic" +export KAFKA_TOPIC='native_token_transfers:erc20_exporter_test_topic' export CARDANO_GRAPHQL_URL=https://cardano.santiment.net export ZOOKEEPER_SESSION_TIMEOUT=20000 export IS_ETH=false diff --git a/src/blockchains/eth/eth_worker.ts b/src/blockchains/eth/eth_worker.ts index 0b604c9b..6fbe9cde 100644 --- a/src/blockchains/eth/eth_worker.ts +++ b/src/blockchains/eth/eth_worker.ts @@ -3,7 +3,7 @@ import { constructRPCClient } from '../../lib/http_client'; import { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } from './lib/dao_hack'; import { getGenesisTransfers } from './lib/genesis_transfers'; import { transactionOrder, stableSort } from './lib/util'; -import { BaseWorker, WorkResultMultiMode } from '../../lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; import { Web3Interface, constructWeb3Wrapper, safeCastToNumber } from './lib/web3_wrapper'; import { decodeTransferTrace } from './lib/decode_transfers'; import { FeesDecoder } from './lib/fees_decoder'; @@ -44,8 +44,7 @@ export class ETHWorker extends BaseWorker { return await Promise.all([traces, blocks, receipts]); } - transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], - blocks: any, receipts: ETHReceipt[]): ETHTransfer[] { + transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], blocks: any, receipts: ETHReceipt[]): ETHTransfer[] { let events: ETHTransfer[] = []; if (fromBlock === 0) { logger.info('Adding the GENESIS transfers'); @@ -105,8 +104,7 @@ export class ETHWorker extends BaseWorker { return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) } - - async work(): Promise { + async work(): Promise { const result: WorkResultMultiMode = {}; const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); @@ -116,7 +114,6 @@ export class ETHWorker extends BaseWorker { logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); - const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); @@ -136,7 +133,14 @@ export class ETHWorker extends BaseWorker { this.lastPrimaryKey += events.length; } - result[this.settings.NATIVE_TOKEN_MODE] = events; + + if (this.modes.length === 1) { + // We are operating in single mode + return events; + } + else { + return result[this.settings.NATIVE_TOKEN_MODE] = events; + } } if (this.modes.includes(this.settings.RECEIPTS_MODE)) { assertIsDefined(receipts, "Receipts are needed for receipts extraction"); @@ -156,10 +160,11 @@ export class ETHWorker extends BaseWorker { async init(): Promise { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; - if (typeof this.settings.KAFKA_TOPIC === 'string') { + if (!this.settings.KAFKA_TOPIC.includes(":")) { this.modes = [this.settings.NATIVE_TOKEN_MODE]; } else if (typeof this.settings.KAFKA_TOPIC === 'object') { + // TODO convert to object in common function to also be used by KafkaStorage this.modes = Object.keys(this.settings.KAFKA_TOPIC); } } diff --git a/src/blockchains/eth/lib/fetch_data.ts b/src/blockchains/eth/lib/fetch_data.ts index a5be0707..5b5ed639 100644 --- a/src/blockchains/eth/lib/fetch_data.ts +++ b/src/blockchains/eth/lib/fetch_data.ts @@ -15,13 +15,14 @@ export function parseEthInternalTrx(result: Trace[]): Trace[] { ); } -export function fetchEthInternalTrx(ethClient: HTTPClientInterface, +export async function fetchEthInternalTrx(ethClient: HTTPClientInterface, web3Wrapper: Web3Interface, fromBlock: number, toBlock: number): Promise { const filterParams = { fromBlock: web3Wrapper.parseNumberToHex(fromBlock), toBlock: web3Wrapper.parseNumberToHex(toBlock) }; - return ethClient.request('trace_filter', [filterParams]).then((data: any) => parseEthInternalTrx(data['result'])); + const data: any = await ethClient.request('trace_filter', [filterParams]); + return parseEthInternalTrx(data['result']); } export async function fetchBlocks(ethClient: HTTPClientInterface, @@ -54,14 +55,19 @@ export async function fetchReceipts(ethClient: HTTPClientInterface, ); } const finishedRequests = await ethClient.requestBulk(batch); + const result: ETHReceipt[] = []; - return finishedRequests.map((response: any) => { + finishedRequests.forEach((response: any) => { if (response.result) { - response.result; + response.result.forEach((receipt: ETHReceipt) => { + result.push(receipt); + }); } else { throw new Error(JSON.stringify(response)); } }); + + return result; } diff --git a/src/main.ts b/src/main.ts index afcfd255..cfed1ee6 100644 --- a/src/main.ts +++ b/src/main.ts @@ -29,22 +29,29 @@ export class Main { )) } - async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string | Map) { - if (typeof kafkaTopic === 'string') { + async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) { + if (!kafkaTopic.includes(':')) { + logger.info(`Constructing single Kafka producer`) this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); } - else if (typeof kafkaTopic === 'object') { - this.kafkaStorage = Object.entries(kafkaTopic).reduce((acc, [key, value]) => { - acc.set(key, new KafkaStorage(exporterName, isTransactions, value)); + else { + const keyValuePairs = kafkaTopic.split(','); + + this.kafkaStorage = keyValuePairs.reduce((acc, pair) => { + const [key, value] = pair.split(':'); + if (key && value) { + acc.set(key.trim(), new KafkaStorage(exporterName, isTransactions, value)); + } + else { + throw new Error(`key-value pair format is unexpected in KAFKA_TOPIC`); + } return acc; }, new Map()); - const kafkaTopicConcat = Array.from(Object.keys(kafkaTopic)).join('-') - this.zookeeperState = new ZookeeperState(exporterName, kafkaTopicConcat); - } else { - throw new Error(`kafkaTopic variable should be either string or an object. It is: ${kafkaTopic}`); - } + logger.info(`Constructed ${keyValuePairs.length} Kafka producers`) + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); + } const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] if (kafkaStoragesArray.length === 0) { @@ -125,11 +132,11 @@ export class Main { await this.kafkaStorage.storeEvents(workResult, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA); } } - else if (workResult instanceof Map) { + else if (typeof workResult === 'object') { if (!(this.kafkaStorage instanceof Map)) { throw new Error('Worker returns data for multiple Kafka storages and single is defined') } - for (const [mode, data] of workResult.entries()) { + for (const [mode, data] of Object.entries(workResult)) { const kafkaStoragePerMode = this.kafkaStorage.get(mode) if (!kafkaStoragePerMode) { throw Error(`Workers returns data for mode ${mode} and no worker is defined for this mode`) diff --git a/src/test/index.test.ts b/src/test/index.test.ts index bb96b65c..bc32b622 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -289,7 +289,6 @@ describe('Main tests', () => { } async storeEvents() { - console.log("Store events mock") throw new Error('storeEvents failed'); } } From 09c33afc4d2a4b07ad629d8f13080e9a82ce91f7 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Wed, 24 Jul 2024 10:53:31 +0300 Subject: [PATCH 7/8] Test fixes. Move KAFKA_TOPIC parsing to util function. --- src/blockchains/eth/eth_worker.ts | 22 ++-- src/blockchains/receipts/lib/constants.ts | 10 -- src/blockchains/receipts/receipts_worker.ts | 108 -------------------- src/lib/utils.ts | 25 +++++ src/main.ts | 15 +-- src/test/eth/worker.spec.ts | 11 +- src/test/index.test.ts | 39 ++----- 7 files changed, 52 insertions(+), 178 deletions(-) delete mode 100644 src/blockchains/receipts/lib/constants.ts delete mode 100644 src/blockchains/receipts/receipts_worker.ts diff --git a/src/blockchains/eth/eth_worker.ts b/src/blockchains/eth/eth_worker.ts index 6fbe9cde..9ed1148b 100644 --- a/src/blockchains/eth/eth_worker.ts +++ b/src/blockchains/eth/eth_worker.ts @@ -13,7 +13,7 @@ import { fetchEthInternalTrx, fetchBlocks, fetchReceipts } from './lib/fetch_dat import { HTTPClientInterface } from '../../types'; import { Trace, ETHBlock, ETHTransfer, ETHReceipt } from './eth_types'; import { EOB, collectEndOfBlocks } from './lib/end_of_block'; -import { assertIsDefined } from '../../lib/utils'; +import { assertIsDefined, parseKafkaTopicToObject } from '../../lib/utils'; import { decodeReceipt } from './lib/helper_receipts' export class ETHWorker extends BaseWorker { @@ -104,7 +104,7 @@ export class ETHWorker extends BaseWorker { return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) } - async work(): Promise { + async work(): Promise { const result: WorkResultMultiMode = {}; const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); @@ -112,7 +112,7 @@ export class ETHWorker extends BaseWorker { const { fromBlock, toBlock } = nextIntervalCalculator(this); - logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); + logger.info(`Fetching events for interval ${fromBlock}:${toBlock}`); const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); @@ -134,13 +134,7 @@ export class ETHWorker extends BaseWorker { this.lastPrimaryKey += events.length; } - if (this.modes.length === 1) { - // We are operating in single mode - return events; - } - else { - return result[this.settings.NATIVE_TOKEN_MODE] = events; - } + result[this.settings.NATIVE_TOKEN_MODE] = events; } if (this.modes.includes(this.settings.RECEIPTS_MODE)) { assertIsDefined(receipts, "Receipts are needed for receipts extraction"); @@ -161,11 +155,11 @@ export class ETHWorker extends BaseWorker { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; if (!this.settings.KAFKA_TOPIC.includes(":")) { - this.modes = [this.settings.NATIVE_TOKEN_MODE]; + throw new Error("ETH worker, expects KAFKA_TOPIC in mode:name format") } - else if (typeof this.settings.KAFKA_TOPIC === 'object') { - // TODO convert to object in common function to also be used by KafkaStorage - this.modes = Object.keys(this.settings.KAFKA_TOPIC); + else { + const mapping = parseKafkaTopicToObject(this.settings.KAFKA_TOPIC) + this.modes = Object.keys(mapping); } } } diff --git a/src/blockchains/receipts/lib/constants.ts b/src/blockchains/receipts/lib/constants.ts deleted file mode 100644 index 232c64c9..00000000 --- a/src/blockchains/receipts/lib/constants.ts +++ /dev/null @@ -1,10 +0,0 @@ -export const DRY_RUN = parseInt(process.env.DRY_RUN || '1'); -export const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50'); -export const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3'); -export const START_BLOCK = parseInt(process.env.START_BLOCK || '0'); -export const GET_RECEIPTS_ENDPOINT = process.env.GET_RECEIPTS_ENDPOINT || 'eth_getBlockReceipts'; -export const NODE_URL = process.env.NODE_URL || 'http://localhost:8545/'; -export const TRANSACTION = parseInt(process.env.TRANSACTION || '0'); -export const GET_BLOCK_ENDPOINT = process.env.GET_BLOCK_ENDPOINT || 'eth_getBlockByNumber'; -export const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30'); - diff --git a/src/blockchains/receipts/receipts_worker.ts b/src/blockchains/receipts/receipts_worker.ts deleted file mode 100644 index 1d523d1b..00000000 --- a/src/blockchains/receipts/receipts_worker.ts +++ /dev/null @@ -1,108 +0,0 @@ -'use strict'; -/*import helper from './lib/helper'; -import { logger } from '../../lib/logger'; -import { constructRPCClient } from '../../lib/http_client'; -import { BaseWorker } from '../../lib/worker_base'; -import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper'; -import { HTTPClientInterface } from '../../types'; -import { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator'; - - -export class ReceiptsWorker extends BaseWorker { - private web3Wrapper: Web3Interface; - private client: HTTPClientInterface; - - constructor(settings: any) { - super(settings); - - logger.info(`Connecting to node ${settings.NODE_URL}`); - this.web3Wrapper = constructWeb3Wrapper(settings.NODE_URL, settings.RPC_USERNAME, settings.RPC_PASSWORD); - this.client = constructRPCClient(settings.NODE_URL, settings.RPC_USERNAME, settings.RPC_PASSWORD, - settings.DEFAULT_TIMEOUT); - } - - async init() { - this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; - } - - async fetchBlockTimestamps(fromBlock: number, toBlock: number) { - const batch = []; - for (let i = fromBlock; i < toBlock + 1; i++) { - batch.push( - this.client.generateRequest( - this.settings.GET_BLOCK_ENDPOINT, - [this.web3Wrapper.parseNumberToHex(i), - true] - ) - ); - } - - return this.client.requestBulk(batch).then((responses) => helper.parseBlocks(responses)); - } - - async fetchReceiptsFromTransaction(blocks: any[]) { - var batch = []; - for (let block = 0; block < blocks.length; block++) { - var transactions = blocks[block]['transactions']; - if (transactions.length === 0) continue; - for (let trx = 0; trx < transactions.length; trx++) { - var transactionHash = transactions[trx]['hash']; - batch.push( - this.client.generateRequest( - this.settings.GET_RECEIPTS_ENDPOINT, - [transactionHash] - ) - ); - } - } - return (!batch.length) ? [] : this.client.requestBulk(batch).then((responses) => helper.parseTransactionReceipts(responses)); - } - - async getReceiptsForBlocks(fromBlock: number, toBlock: number) { - logger.info(`Fetching blocks ${fromBlock}:${toBlock}`); - const blocks = await this.fetchBlockTimestamps(fromBlock, toBlock); - let receipts; - - if (!this.settings.TRANSACTION) { - receipts = await this.fetchReceipts(fromBlock, toBlock); - } - else { - receipts = await this.fetchReceiptsFromTransaction(blocks); - } - const decodedReceipts = receipts.map((receipt: any) => helper.decodeReceipt(receipt, this.web3Wrapper)); - const decodedBlocks = blocks.map((block: any) => helper.decodeBlock(block, this.web3Wrapper)); - const timestamps = helper.prepareBlockTimestampsObject(decodedBlocks); - - return helper.setReceiptsTimestamp(decodedReceipts, timestamps); - } - - async fetchReceipts(fromBlock: number, toBlock: number) { - const batch = []; - for (let i = fromBlock; i <= toBlock; i++) { - batch.push( - this.client.generateRequest( - this.settings.GET_RECEIPTS_ENDPOINT, - [this.web3Wrapper.parseNumberToHex(i)] - ) - ); - } - return this.client.requestBulk(batch).then((responses) => helper.parseReceipts(responses)); - } - - async work() { - const workerContext = await analyzeWorkerContext(this); - setWorkerSleepTime(this, workerContext); - if (workerContext === NO_WORK_SLEEP) return []; - - const { fromBlock, toBlock } = nextIntervalCalculator(this); - logger.info(`Fetching receipts for interval ${fromBlock}:${toBlock}`); - const receipts = await this.getReceiptsForBlocks(fromBlock, toBlock); - - this.lastExportedBlock = toBlock; - - return receipts; - } -} - - -*/ \ No newline at end of file diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 779762ef..8474fc5a 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -36,3 +36,28 @@ export function assertIsDefined(value: any, errorMsg: string): asserts value throw Error(errorMsg); } } + +export type ModeToKafkaTopic = { + [topicName: string]: string; +} + +export function parseKafkaTopicToObject(kafkaTopic: string): ModeToKafkaTopic { + const keyValuePairs = kafkaTopic.split(','); + + const result: ModeToKafkaTopic = keyValuePairs.reduce((acc, pair) => { + const [key, value] = pair.split(':'); + if (key && value) { + acc[key.trim()] = value; + } + else { + throw new Error(`key-value pair format is unexpected in KAFKA_TOPIC`); + } + return acc; + }, {} as ModeToKafkaTopic); + + if (Object.keys(result).length < 1) { + throw new Error(`Can not construct multi mode from ${kafkaTopic}`) + } + + return result; +} diff --git a/src/main.ts b/src/main.ts index cfed1ee6..f13f6d7a 100644 --- a/src/main.ts +++ b/src/main.ts @@ -11,6 +11,7 @@ import { EXPORT_TIMEOUT_MLS } from './lib/constants'; import { constructWorker } from './blockchains/construct_worker' import { ExporterPosition } from './types' import { BaseWorker, WorkResult, WorkResultMultiMode } from './lib/worker_base'; +import { parseKafkaTopicToObject } from './lib/utils'; export class Main { private worker!: BaseWorker; @@ -36,20 +37,14 @@ export class Main { this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); } else { - const keyValuePairs = kafkaTopic.split(','); + const mapping = parseKafkaTopicToObject(kafkaTopic) - this.kafkaStorage = keyValuePairs.reduce((acc, pair) => { - const [key, value] = pair.split(':'); - if (key && value) { - acc.set(key.trim(), new KafkaStorage(exporterName, isTransactions, value)); - } - else { - throw new Error(`key-value pair format is unexpected in KAFKA_TOPIC`); - } + this.kafkaStorage = Object.entries(mapping).reduce((acc, [mode, topicName]) => { + acc.set(mode, new KafkaStorage(exporterName, isTransactions, topicName)); return acc; }, new Map()); - logger.info(`Constructed ${keyValuePairs.length} Kafka producers`) + logger.info(`Constructed ${this.kafkaStorage.size} Kafka producers`) this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); } diff --git a/src/test/eth/worker.spec.ts b/src/test/eth/worker.spec.ts index ad70f904..e9fc07c7 100644 --- a/src/test/eth/worker.spec.ts +++ b/src/test/eth/worker.spec.ts @@ -1,11 +1,9 @@ -import assert from 'assert'; const sinon = require('sinon'); import v8 from 'v8'; import { extendEventsWithPrimaryKey, ETHWorker } from '../../blockchains/eth/eth_worker'; import { EOB } from '../../blockchains/eth/lib/end_of_block'; import * as constants from '../../blockchains/eth/lib/constants'; -import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from '../../blockchains/eth/eth_types'; -import { WorkResultMultiMode } from '../../lib/worker_base'; +import { ETHBlock, ETHTransfer } from '../../blockchains/eth/eth_types'; import { expect } from 'earl' import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper'; @@ -16,8 +14,11 @@ describe('Test worker', function () { let endOfBlock: EOB; let eobWithPrimaryKey: EOB & { primaryKey: number }; // This will construct the worker in the 'native token' mode - (constants as any).KAFKA_TOPIC = { [constants.NATIVE_TOKEN_MODE]: 'topic_name_not_used' } - let worker = new ETHWorker(constants); + const mergedConstants = { + KAFKA_TOPIC: `${constants.NATIVE_TOKEN_MODE}:topic_name_not_used`, + ...constants + }; + let worker = new ETHWorker(mergedConstants); let blockInfos = new Map() let feeResultWithPrimaryKey: ETHTransfer; let callResultWithPrimaryKey: ETHTransfer; diff --git a/src/test/index.test.ts b/src/test/index.test.ts index bc32b622..ed1421e4 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -20,7 +20,8 @@ describe('Main tests', () => { const constants = { START_BLOCK: -1, START_PRIMARY_KEY: -1, - BLOCKCHAIN: 'eth' + BLOCKCHAIN: 'eth', + KAFKA_TOPIC: 'NOT_USED' }; let sandbox: any = null; @@ -42,11 +43,7 @@ describe('Main tests', () => { sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); try { - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); } catch (err) { if (err instanceof Error) { assert.strictEqual(err.message, 'Exporter connection failed'); @@ -63,12 +60,8 @@ describe('Main tests', () => { sandbox.stub(KafkaStorage.prototype, 'initTransactions').rejects(new Error('Exporter initTransactions failed')); const mainInstance = new Main(); - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } try { - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); } catch (err) { if (err instanceof Error) { assert.strictEqual(err.message, 'Exporter initTransactions failed'); @@ -161,12 +154,8 @@ describe('Main tests', () => { const mainInstance = new Main(); sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } try { - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -184,15 +173,11 @@ describe('Main tests', () => { sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); const mainInstance = new Main(); - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } sandbox.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); try { - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -213,12 +198,8 @@ describe('Main tests', () => { sandbox.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } try { - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -244,11 +225,7 @@ describe('Main tests', () => { handleInitPositionStub.resolves(); - const mergedConstants = { - KAFKA_TOPIC: 'NOT_USED', - ...constants - } - await mainInstance.init(mergedConstants); + await mainInstance.init(constants); assert(handleInitPositionStub.calledOnce); }); From c63ada0345c93153bba398e4da44eac70ca67762 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Wed, 24 Jul 2024 12:08:49 +0300 Subject: [PATCH 8/8] Generate output for different modes in separate functions --- src/blockchains/eth/eth_types.ts | 12 +++++ src/blockchains/eth/eth_worker.ts | 59 +++++++++++++--------- src/blockchains/eth/lib/helper_receipts.ts | 33 +----------- 3 files changed, 50 insertions(+), 54 deletions(-) diff --git a/src/blockchains/eth/eth_types.ts b/src/blockchains/eth/eth_types.ts index a505999d..531fdac6 100644 --- a/src/blockchains/eth/eth_types.ts +++ b/src/blockchains/eth/eth_types.ts @@ -91,6 +91,18 @@ export type ETHReceipt = { transactionIndex: string } +export type ETHReceiptDecoded = { + blockNumber: number, + blockHash: string, + gasUsed: string, + transactionHash: string, + cumulativeGasUsed: string, + logs: any[], + transactionIndex: number, + status: number, + timestamp: string +} + export type ETHReceiptsMap = { [transactionHash: string]: ETHReceipt; } diff --git a/src/blockchains/eth/eth_worker.ts b/src/blockchains/eth/eth_worker.ts index 9ed1148b..6f1fafa1 100644 --- a/src/blockchains/eth/eth_worker.ts +++ b/src/blockchains/eth/eth_worker.ts @@ -104,6 +104,37 @@ export class ETHWorker extends BaseWorker { return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) } + getTransfersOutput(fromBlock: number, toBlock: number, traces: Trace[], + blocks: Map, receipts: ETHReceipt[], endOfBlockEvents: EOB[]): WorkResult { + assertIsDefined(traces, "Traces are needed for native token transfers"); + assertIsDefined(receipts, "Receipts are needed for native token transfers"); + + const events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); + + + events.push(...endOfBlockEvents) + if (events.length > 0) { + stableSort(events, transactionOrder); + extendEventsWithPrimaryKey(events, this.lastPrimaryKey); + + this.lastPrimaryKey += events.length; + } + + return events; + } + + getReceiptsOutput(blocks: Map, receipts: ETHReceipt[]): WorkResult { + assertIsDefined(receipts, "Receipts are needed for receipts extraction"); + assertIsDefined(blocks, "Blocks are needed for extraction"); + const decodedReceipts = receipts.map((receipt: any) => decodeReceipt(receipt, this.web3Wrapper)); + decodedReceipts.forEach(receipt => { + const block = blocks.get(receipt.blockNumber) + assertIsDefined(block, `Block ${receipt.blockNumber} is missing`) + receipt.timestamp = block.timestamp; + }); + return decodedReceipts; + } + async work(): Promise { const result: WorkResultMultiMode = {}; const workerContext = await analyzeWorkerContext(this); @@ -120,32 +151,14 @@ export class ETHWorker extends BaseWorker { this.lastExportedBlock = toBlock; assertIsDefined(blocks, "Blocks are needed for extraction"); + // TODO consider if EOB events should also be present in other output topics + const endOfBlockEvents = collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper) if (this.modes.includes(this.settings.NATIVE_TOKEN_MODE)) { - assertIsDefined(traces, "Traces are needed for native token transfers"); - assertIsDefined(receipts, "Receipts are needed for native token transfers"); - - const events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); - - events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper)) - if (events.length > 0) { - stableSort(events, transactionOrder); - extendEventsWithPrimaryKey(events, this.lastPrimaryKey); - - this.lastPrimaryKey += events.length; - } - - result[this.settings.NATIVE_TOKEN_MODE] = events; + result[this.settings.NATIVE_TOKEN_MODE] = this.getTransfersOutput(fromBlock, toBlock, traces, blocks, + receipts, endOfBlockEvents); } if (this.modes.includes(this.settings.RECEIPTS_MODE)) { - assertIsDefined(receipts, "Receipts are needed for receipts extraction"); - assertIsDefined(blocks, "Blocks are needed for extraction"); - const decodedReceipts = receipts.map((receipt: any) => decodeReceipt(receipt, this.web3Wrapper)); - decodedReceipts.forEach(receipt => { - const block = blocks.get(receipt.blockNumber) - assertIsDefined(block, `Block ${receipt.blockNumber} is missing`) - receipt['timestamp'] = block.timestamp; - }); - result[this.settings.RECEIPTS_MODE] = decodedReceipts; + result[this.settings.RECEIPTS_MODE] = this.getReceiptsOutput(blocks, receipts); } return result; diff --git a/src/blockchains/eth/lib/helper_receipts.ts b/src/blockchains/eth/lib/helper_receipts.ts index e425c0b9..87639f72 100644 --- a/src/blockchains/eth/lib/helper_receipts.ts +++ b/src/blockchains/eth/lib/helper_receipts.ts @@ -1,4 +1,4 @@ -import { ETHReceipt } from "../eth_types"; +import { ETHReceipt, ETHReceiptDecoded } from "../eth_types"; const lang = require('lodash/lang'); const object = require('lodash/object'); @@ -6,20 +6,6 @@ const collection = require('lodash/collection'); import { Web3Interface } from './web3_wrapper'; -/*const parseReceipts = (responses) => { - const receipts = responses.map((response) => response['result']); - return array.compact(array.flatten(receipts)); -}; - -const parseBlocks = (responses) => { - return responses.map((response) => response['result']); -}; - -const parseTransactionReceipts = (responses) => { - const receipts = responses.map((response) => response['result']); - return receipts; -}; -*/ const decodeLog = (log: any, web3Wrapper: Web3Interface) => { collection.forEach(['blockNumber', 'blockHash', 'transactionHash', 'transactionIndex'], (key: string) => object.unset(log, key)); @@ -49,7 +35,7 @@ const columnizeLogs = (logs: any[], web3Wrapper: Web3Interface) => { return result; }; -export function decodeReceipt(receipt: ETHReceipt, web3Wrapper: Web3Interface) { +export function decodeReceipt(receipt: ETHReceipt, web3Wrapper: Web3Interface): ETHReceiptDecoded { const clonedReceipt = lang.cloneDeep(receipt); collection.forEach(['blockNumber', 'status', 'transactionIndex'], @@ -73,20 +59,5 @@ export function decodeReceipt(receipt: ETHReceipt, web3Wrapper: Web3Interface) { return clonedReceipt; }; -/*const decodeBlock = (block, web3Wrapper) => { - return { - timestamp: web3Wrapper.parseHexToNumber(block.timestamp), - number: web3Wrapper.parseHexToNumber(block.number) - }; -}; - -const prepareBlockTimestampsObject = (blocks) => { - let obj = {}; - for (const block of blocks) { obj[block.number] = block.timestamp; } - - return obj; -};*/ - -