diff --git a/app/apps/onebox/src/polling.block/polling.block.service.ts b/app/apps/onebox/src/polling.block/polling.block.service.ts index e1674a5..c4b5246 100644 --- a/app/apps/onebox/src/polling.block/polling.block.service.ts +++ b/app/apps/onebox/src/polling.block/polling.block.service.ts @@ -2,7 +2,6 @@ import { TopicName } from '@app/utils/topicUtils'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { ClientKafka } from '@nestjs/microservices'; import { CronExpression, SchedulerRegistry } from '@nestjs/schedule'; -import { EthereumWorker } from 'apps/worker-service/src/worker/evm.worker'; import { CronJob } from 'cron'; import { ethers } from 'ethers'; import { BlockSyncService } from '../modules/blocksync/blocksync.service'; @@ -12,7 +11,7 @@ import { SupportedChain } from '@app/utils/supportedChain.util'; export class PollingBlockService { private detectInfo = { flag: false, blockNumber: 0 }; - private readonly logger = new Logger(EthereumWorker.name); + private readonly logger = new Logger(PollingBlockService.name); constructor( private schedulerRegistry: SchedulerRegistry, diff --git a/app/apps/worker-service/src/worker-service.controller.ts b/app/apps/worker-service/src/worker-service.controller.ts index 882f2ab..ac17e33 100644 --- a/app/apps/worker-service/src/worker-service.controller.ts +++ b/app/apps/worker-service/src/worker-service.controller.ts @@ -2,10 +2,14 @@ import { Controller, Get } from '@nestjs/common'; import { WorkerServiceService } from './worker-service.service'; import { EventPattern } from '@nestjs/microservices'; import { TopicName } from '@app/utils/topicUtils'; +import { EthereumWorker } from './worker/ethereum.worker'; @Controller() export class WorkerServiceController { - constructor(private readonly workerServiceService: WorkerServiceService) {} + constructor( + private readonly workerServiceService: WorkerServiceService, + private readonly ethereumWorker: EthereumWorker, + ) {} @Get() getHello(): string { @@ -14,11 +18,11 @@ export class WorkerServiceController { @EventPattern(TopicName.ETH_DETECTED_BLOCK) async ethDetectBlock(data: any) { - this.workerServiceService.ethHandleDetectedBlock(data); + this.ethereumWorker.ethHandleDetectedBlock(data); } @EventPattern(TopicName.ETH_CONFIRMED_BLOCK) async ethConfirmBlock(data: any) { - this.workerServiceService.ethHandleConfirmedBlock(data); + this.ethereumWorker.ethHandleConfirmedBlock(data); } } diff --git a/app/apps/worker-service/src/worker-service.module.ts b/app/apps/worker-service/src/worker-service.module.ts index 4baff5d..ab7f257 100644 --- a/app/apps/worker-service/src/worker-service.module.ts +++ b/app/apps/worker-service/src/worker-service.module.ts @@ -1,13 +1,11 @@ -import { DatabaseModule } from '@app/database'; -import { GlobalModule } from '@app/global'; import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { ClientsModule, Transport } from '@nestjs/microservices'; import { ScheduleModule } from '@nestjs/schedule'; import { BlockSyncModule } from './blocksync/blocksync.module'; import { WorkerServiceController } from './worker-service.controller'; +import { EthereumWorker } from './worker/ethereum.worker'; import { WorkerServiceService } from './worker-service.service'; -import { EthereumWorker } from './worker/evm.worker'; @Module({ imports: [ @@ -36,6 +34,6 @@ import { EthereumWorker } from './worker/evm.worker'; ScheduleModule.forRoot(), ], controllers: [WorkerServiceController], - providers: [WorkerServiceService], + providers: [WorkerServiceService, EthereumWorker], }) export class WorkerServiceModule {} diff --git a/app/apps/worker-service/src/worker-service.service.ts b/app/apps/worker-service/src/worker-service.service.ts index 50c1a41..925c45e 100644 --- a/app/apps/worker-service/src/worker-service.service.ts +++ b/app/apps/worker-service/src/worker-service.service.ts @@ -1,117 +1,15 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { ClientKafka } from '@nestjs/microservices'; -import { ethers } from 'ethers'; -import { BlockSyncService } from './blocksync/blocksync.service'; -import { TopicName } from '@app/utils/topicUtils'; @Injectable() export class WorkerServiceService { - private readonly logger = new Logger(WorkerServiceService.name); - rpcUrl: string; - provider: ethers.Provider; - constructor( @Inject('MONITOR_CLIENT_SERVICE') private monitorClient: ClientKafka, - private readonly blockSyncService: BlockSyncService, - ) { - if (process.env.ETH_PROVIDER_URL) { - this.rpcUrl = process.env.ETH_PROVIDER_URL; - this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL); - } - } + ) {} getHello(): string { console.log('Emit blockchain-event to kafka'); this.monitorClient.emit('blockchain-event', { userId: 'abc' }); return 'Hello World!'; } - - async ethHandleDetectedBlock(data: { blockNumber: number }) { - const blockNumber = data.blockNumber; - - try { - this.logger.debug(['DETECT', `handle block ${blockNumber}`]); - // handle native transfer - this.handleNativeTransfer(blockNumber, false); - // handle extracted event for erc20 and nft - this.handleLog(blockNumber, false); - //only update last sync for confirm - // await this.updateLastSyncBlock(blockNumber); - } catch (error) { - // @todo re-add error block into kafka, and save in db - this.logger.error([ - 'DETECT', - `Error scanning block ${blockNumber}:`, - error, - ]); - } - - return; - } - - async ethHandleConfirmedBlock(data: { blockNumber: number }) { - const blockNumber = data.blockNumber; - try { - this.logger.debug(['CONFIRM', `Scanning block ${blockNumber}`]); - // handle native transfer - this.handleNativeTransfer(blockNumber, true); - // handle extracted event for erc20 and nft - this.handleLog(blockNumber, true); - } catch (error) { - this.logger.error([ - 'CONFIRM', - `Error scanning block ${blockNumber}:`, - error, - ]); - } - return; - } - - private async handleLog( - blockNumber: number, - confirm: boolean, - ): Promise { - // Retrieve transfer event the block's logs - const logs = await this.provider.getLogs({ - fromBlock: blockNumber, - toBlock: blockNumber, - topics: [ - '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', - ], - }); - - // handle extracted event for erc20 and nft - logs.forEach((event) => { - if (event.topics.length === 3) { - // this.ethMonitorService.handleErc20Transfer(event, confirm); - this.monitorClient.emit(TopicName.ETH_ERC20_TRANSFER, { - event: event, - confirm: confirm, - }); - } else if (event.topics.length === 4) { - // this.ethMonitorService.handleErc721Transfer(event, confirm); - this.monitorClient.emit(TopicName.ETH_ERC721_TRANSFER, { - event: event, - confirm: confirm, - }); - } - }); - } - - private async handleNativeTransfer( - blockNumber: number, - confirm: boolean, - ): Promise { - // Retrieve all transaction in block - const block = await this.provider.getBlock(blockNumber, true); - - // handle extracted event for native - block.prefetchedTransactions.forEach((transaction) => { - // this.ethMonitorService.handleNativeTransfer(transaction, confirm); - this.monitorClient.emit(TopicName.ETH_NATIVE_TRANSFER, { - transaction: transaction, - confirm: confirm, - }); - }); - } } diff --git a/app/apps/worker-service/src/worker/ethereum.worker.ts b/app/apps/worker-service/src/worker/ethereum.worker.ts new file mode 100644 index 0000000..3b5af78 --- /dev/null +++ b/app/apps/worker-service/src/worker/ethereum.worker.ts @@ -0,0 +1,109 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { ethers } from 'ethers'; +import { TopicName } from '@app/utils/topicUtils'; + +@Injectable() +export class EthereumWorker { + private readonly logger = new Logger(EthereumWorker.name); + rpcUrl: string; + provider: ethers.Provider; + + constructor( + @Inject('MONITOR_CLIENT_SERVICE') private monitorClient: ClientKafka, + ) { + if (process.env.ETH_PROVIDER_URL) { + this.rpcUrl = process.env.ETH_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL); + } + } + + async ethHandleDetectedBlock(data: { blockNumber: number }) { + const blockNumber = data.blockNumber; + + try { + this.logger.debug(['DETECT', `handle block ${blockNumber}`]); + // handle native transfer + this.handleNativeTransfer(blockNumber, false); + // handle extracted event for erc20 and nft + this.handleLog(blockNumber, false); + //only update last sync for confirm + // await this.updateLastSyncBlock(blockNumber); + } catch (error) { + // @todo re-add error block into kafka, and save in db + this.logger.error([ + 'DETECT', + `Error scanning block ${blockNumber}:`, + error, + ]); + } + + return; + } + + async ethHandleConfirmedBlock(data: { blockNumber: number }) { + const blockNumber = data.blockNumber; + try { + this.logger.debug(['CONFIRM', `Scanning block ${blockNumber}`]); + // handle native transfer + this.handleNativeTransfer(blockNumber, true); + // handle extracted event for erc20 and nft + this.handleLog(blockNumber, true); + } catch (error) { + this.logger.error([ + 'CONFIRM', + `Error scanning block ${blockNumber}:`, + error, + ]); + } + return; + } + + private async handleLog( + blockNumber: number, + confirm: boolean, + ): Promise { + // Retrieve transfer event the block's logs + const logs = await this.provider.getLogs({ + fromBlock: blockNumber, + toBlock: blockNumber, + topics: [ + '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', + ], + }); + + // handle extracted event for erc20 and nft + logs.forEach((event) => { + if (event.topics.length === 3) { + // this.ethMonitorService.handleErc20Transfer(event, confirm); + this.monitorClient.emit(TopicName.ETH_ERC20_TRANSFER, { + event: event, + confirm: confirm, + }); + } else if (event.topics.length === 4) { + // this.ethMonitorService.handleErc721Transfer(event, confirm); + this.monitorClient.emit(TopicName.ETH_ERC721_TRANSFER, { + event: event, + confirm: confirm, + }); + } + }); + } + + private async handleNativeTransfer( + blockNumber: number, + confirm: boolean, + ): Promise { + // Retrieve all transaction in block + const block = await this.provider.getBlock(blockNumber, true); + + // handle extracted event for native + block.prefetchedTransactions.forEach((transaction) => { + // this.ethMonitorService.handleNativeTransfer(transaction, confirm); + this.monitorClient.emit(TopicName.ETH_NATIVE_TRANSFER, { + transaction: transaction, + confirm: confirm, + }); + }); + } +} diff --git a/app/apps/worker-service/src/worker/evm.worker.ts b/app/apps/worker-service/src/worker/evm.worker.ts deleted file mode 100644 index 9d2a0ec..0000000 --- a/app/apps/worker-service/src/worker/evm.worker.ts +++ /dev/null @@ -1,237 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { Cron, CronExpression } from '@nestjs/schedule'; -import { ethers } from 'ethers'; -import { BlockSyncService } from '../blocksync/blocksync.service'; -import { ClientKafka } from '@nestjs/microservices'; -import { TopicName } from '@app/utils/topicUtils'; - -interface ScanInfo { - flag: boolean; - blockNumber: number; -} - -@Injectable() -export class EthereumWorker { - private readonly logger = new Logger(EthereumWorker.name); - private readonly confirmBlock = 12; - private detectInfo: ScanInfo = { flag: false, blockNumber: 0 }; - private confirmInfo: ScanInfo = { flag: false, blockNumber: 0 }; - rpcUrl: string; - provider: ethers.Provider; - - @Inject(BlockSyncService) - private readonly blockSyncService: BlockSyncService; - - @Inject('MONITOR_CLIENT_SERVICE') - private readonly monitorClient: ClientKafka; - - enable: boolean; - - onModuleInit() { - console.log(`The module has been initialized.`); - if (process.env.EVM_DISABLE === 'true') { - this.detectInfo.flag = true; - this.confirmInfo.flag = true; - return; - } - this.rpcUrl = process.env.ETH_PROVIDER_URL; - this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL); - this.initWorker(); - } - - onApplicationShutdown(signal: string) { - console.log(signal); // e.g. "SIGINT" - } - - async initWorker() { - this.detectInfo.flag = true; - this.confirmInfo.flag = true; - let blockSync = await this.blockSyncService.findOne(this.rpcUrl); - if (!blockSync) { - blockSync = await this.blockSyncService.create({ - rpcUrl: this.rpcUrl, - lastSync: parseInt(process.env.EVM_START_BLOCK), - }); - } - // checking force latest block config - const startBlockConfig = process.env.EVM_START_BLOCK_CONFIG; - if (startBlockConfig === 'latest') { - const latestBlockNumber = await this.provider.getBlockNumber(); - this.logger.warn( - 'force running latest block from network ' + latestBlockNumber, - ); - this.updateLastSyncBlock(latestBlockNumber); - this.detectInfo.blockNumber = latestBlockNumber - 1; - this.confirmInfo.blockNumber = latestBlockNumber - 1; - } else if (startBlockConfig === 'config') { - this.logger.warn( - 'force running start block from config ' + process.env.EVM_START_BLOCK, - ); - this.updateLastSyncBlock(parseInt(process.env.EVM_START_BLOCK)); - this.detectInfo.blockNumber = parseInt(process.env.EVM_START_BLOCK) - 1; - this.confirmInfo.blockNumber = parseInt(process.env.EVM_START_BLOCK) - 1; - } else { - this.logger.warn('running start block from db ' + blockSync.lastSync); - this.detectInfo.blockNumber = blockSync.lastSync + this.confirmBlock; - this.confirmInfo.blockNumber = blockSync.lastSync; - } - this.detectInfo.flag = false; - this.confirmInfo.flag = false; - } - - @Cron(CronExpression.EVERY_10_SECONDS, { - name: 'detect', - disabled: process.env.EVM_DISABLE === 'true', - }) - async detect() { - this.logger.log('Start detect block'); - if (this.detectInfo.flag) { - return; - } - this.detectInfo.flag = true; - const lastDetectedBlock = this.detectInfo.blockNumber + 1; - - // Get the latest block number - const latestDetectedBlockNumber = await this.getBlockNumber(); - - // Scan each block - for ( - let blockNumber = lastDetectedBlock; - blockNumber <= latestDetectedBlockNumber; - blockNumber++ - ) { - try { - this.logger.debug(['DETECT', `Scanning block ${blockNumber}`]); - // handle native transfer - this.handleNativeTransfer(blockNumber, false); - // handle extracted event for erc20 and nft - this.handleLog(blockNumber, false); - this.detectInfo.blockNumber = blockNumber; - //only update last sync for confirm - // await this.updateLastSyncBlock(blockNumber); - } catch (error) { - this.logger.error([ - 'DETECT', - `Error scanning block ${blockNumber}:`, - error, - ]); - break; - } - } - - this.detectInfo.flag = false; - return; - } - - @Cron(CronExpression.EVERY_10_SECONDS, { - disabled: process.env.EVM_DISABLE === 'true', - }) - async confirm() { - if (this.confirmInfo.flag) { - return; - } - this.confirmInfo.flag = true; - - const lastConfirmedBlock = this.confirmInfo.blockNumber + 1; - - // Get the latest block number - const latestConfirmedBlockNumber = - (await this.getBlockNumber()) - this.confirmBlock; - - // Scan each block - for ( - let blockNumber = lastConfirmedBlock; - blockNumber <= latestConfirmedBlockNumber; - blockNumber++ - ) { - try { - this.logger.debug(['CONFIRM', `Scanning block ${blockNumber}`]); - // handle native transfer - this.handleNativeTransfer(blockNumber, true); - // handle extracted event for erc20 and nft - this.handleLog(blockNumber, true); - this.confirmInfo.blockNumber = blockNumber; - await this.updateLastSyncBlock(blockNumber); - } catch (error) { - this.logger.error([ - 'CONFIRM', - `Error scanning block ${blockNumber}:`, - error, - ]); - break; - } - } - - this.confirmInfo.flag = false; - return; - } - - private async handleLog( - blockNumber: number, - confirm: boolean, - ): Promise { - // Retrieve transfer event the block's logs - const logs = await this.provider.getLogs({ - fromBlock: blockNumber, - toBlock: blockNumber, - topics: [ - '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', - ], - }); - - // handle extracted event for erc20 and nft - logs.forEach((event) => { - if (event.topics.length === 3) { - // this.ethMonitorService.handleErc20Transfer(event, confirm); - this.monitorClient.emit(TopicName.ETH_ERC20_TRANSFER, { - event: event, - confirm: confirm, - }); - } else if (event.topics.length === 4) { - // this.ethMonitorService.handleErc721Transfer(event, confirm); - this.monitorClient.emit(TopicName.ETH_ERC721_TRANSFER, { - event: event, - confirm: confirm, - }); - } - }); - } - - private async handleNativeTransfer( - blockNumber: number, - confirm: boolean, - ): Promise { - // Retrieve all transaction in block - const block = await this.provider.getBlock(blockNumber, true); - - // handle extracted event for native - block.prefetchedTransactions.forEach((transaction) => { - // this.ethMonitorService.handleNativeTransfer(transaction, confirm); - this.monitorClient.emit(TopicName.ETH_NATIVE_TRANSFER, { - transaction: transaction, - confirm: confirm, - }); - }); - } - - private async updateLastSyncBlock(blockNumber: number): Promise { - // Update the last sync block in MongoDB - await this.blockSyncService.updateLastSync(this.rpcUrl, blockNumber); - } - - private async getLastSyncBlock(): Promise { - // Get the last sync block from MongoDB - const lastSyncBlock = await this.blockSyncService.findOne(this.rpcUrl); - return lastSyncBlock?.lastSync; - } - - private async getBlockNumber(): Promise { - try { - const blockNumber = await this.provider.getBlockNumber(); - return blockNumber; - } catch (error) { - this.logger.error('error while getting block number', error); - } - return 0; - } -}