diff --git a/app/.env b/app/.env index 8ba088f..4e963b0 100644 --- a/app/.env +++ b/app/.env @@ -49,13 +49,18 @@ WEBHOOK_API_URL=http://127.0.0.1:8000 # Blockchain environment variables -WEB3_PROVIDER_URL=https://goerli.infura.io/v3/d7aa2a84a44548819617058aa0a3e347 +# WEB3_PROVIDER_URL=https://goerli.infura.io/v3/d7aa2a84a44548819617058aa0a3e347 ETH_PROVIDER_URL=https://atcrpc-dev.odw.ai EVM_DISABLE=false EVM_START_BLOCK=10199699 EVM_START_BLOCK_CONFIG=latest # 'config' or 'latest' or 'db' -MANTLE_WEB3_PROVIDER_URL=https://1rpc.io/4n4xz9Bow96oZU2n9/mantle +POLYGON_PROVIDER_URL=https://goerli.infura.io/v3/d7aa2a84a44548819617058aa0a3e347 +POLYGON_DISABLE=false +POLYGON_START_BLOCK=56154832 +POLYGON_START_BLOCK_CONFIG=latest # 'config' or 'latest' or 'db' + +MANTLE_PROVIDER_URL=https://1rpc.io/4n4xz9Bow96oZU2n9/mantle MANTLE_DISABLE=true MANTLE_START_BLOCK=26620500 MANTLE_START_BLOCK_CONFIG=db # 'config' or 'latest' or 'db' \ No newline at end of file diff --git a/app/apps/monitor-service/src/ethereum/dto/eth.webhook-delivery.dto.ts b/app/apps/monitor-service/src/ethereum/dto/eth.webhook-delivery.dto.ts deleted file mode 100644 index de1cca0..0000000 --- a/app/apps/monitor-service/src/ethereum/dto/eth.webhook-delivery.dto.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { generateWebhookEventId } from '@app/utils/uuidUtils'; -import { Log, TransactionResponse, ethers } from 'ethers'; - -export enum WebhookCategory { - ERC721 = 'ERC721', - Native = 'Native', - ERC20 = 'ERC20', - INTERNAL = 'INTERNAL', -} - -export enum WebhookType { - in = 'in', - out = 'out', -} -export class WebhookDeliveryDto { - id: string; - chain: string; - monitorId: string; - hash: string; - blockNum: number; // decimal string - associatedAddress: string; - contract: { - address: string; - name: string; - symbol: string; - }; - fromAddress: string; - toAddress: string; - tokenId: string; // decimal string - tokenValue: string; // decimal string - nativeAmount: string; // decimal string - type: WebhookType; - confirm: boolean; - category: WebhookCategory; - rawLog: { - topics: string[]; - data: string; - }; - logIndex: number; - txnIndex: number; - tags: string[]; - - toString(): string { - return JSON.stringify(this); - } - - public static fromLogToERC20( - log: Log, - chain: string, - monitorId: string, - type: WebhookType, - confirm: boolean, - tokenValue: string, - ): WebhookDeliveryDto { - const instance = new WebhookDeliveryDto(); - instance.id = generateWebhookEventId(); - instance.chain = chain; - instance.monitorId = monitorId; - instance.hash = log.transactionHash; - instance.blockNum = log.blockNumber; - instance.contract = { - address: ethers.getAddress(log.address).toLowerCase(), - name: null, - symbol: null, - }; - instance.fromAddress = log.topics[1].substring(26); - instance.toAddress = log.topics[2].substring(26); - instance.tokenId = '0'; - instance.tokenValue = tokenValue; - instance.nativeAmount = '0'; - instance.rawLog = { - topics: log.topics as string[], - data: log.data, - }; - instance.type = type; - instance.confirm = confirm; - instance.category = WebhookCategory.ERC20; - - return instance; - } - - public static fromLogToERC721( - log: Log, - chain: string, - monitorId: string, - type: WebhookType, - confirm: boolean, - tokenId: string, - ): WebhookDeliveryDto { - const instance = new WebhookDeliveryDto(); - instance.id = generateWebhookEventId(); - instance.chain = chain; - instance.monitorId = monitorId; - instance.hash = log.transactionHash; - instance.blockNum = log.blockNumber; - instance.contract = { - address: ethers.getAddress(log.address).toLowerCase(), - name: null, - symbol: null, - }; - instance.fromAddress = log.topics[1].substring(26); - instance.toAddress = log.topics[2].substring(26); - instance.tokenId = tokenId; - instance.tokenValue = '0'; - instance.nativeAmount = '0'; - instance.rawLog = { - topics: log.topics as string[], - data: log.data, - }; - instance.type = type; - instance.confirm = confirm; - instance.category = WebhookCategory.ERC721; - - return instance; - } - - public static fromTransactionToNative( - transaction: TransactionResponse, - chain: string, - monitorId: string, - type: WebhookType, - confirm: boolean, - ): WebhookDeliveryDto { - const instance = new WebhookDeliveryDto(); - instance.id = generateWebhookEventId(); - instance.chain = chain; - instance.monitorId = monitorId; - instance.hash = transaction.hash; - instance.blockNum = transaction.blockNumber; - instance.fromAddress = transaction.from.toLowerCase(); - instance.toAddress = transaction.to.toLowerCase(); - - instance.tokenId = '0'; - instance.tokenValue = '0'; - instance.nativeAmount = transaction.value.toString(); - instance.type = type; - instance.confirm = confirm; - instance.category = WebhookCategory.Native; - // @todo assign data from transaction data - - return instance; - } -} diff --git a/app/apps/monitor-service/src/ethereum/ethereum.controller.spec.ts b/app/apps/monitor-service/src/ethereum/ethereum.controller.spec.ts deleted file mode 100644 index b5da1a6..0000000 --- a/app/apps/monitor-service/src/ethereum/ethereum.controller.spec.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { EthereumController } from './ethereum.controller'; - -describe('EthereumController', () => { - let controller: EthereumController; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - controllers: [EthereumController], - }).compile(); - - controller = module.get(EthereumController); - }); - - it('should be defined', () => { - expect(controller).toBeDefined(); - }); -}); diff --git a/app/apps/monitor-service/src/ethereum/ethereum.service.spec.ts b/app/apps/monitor-service/src/ethereum/ethereum.service.spec.ts deleted file mode 100644 index de9c79b..0000000 --- a/app/apps/monitor-service/src/ethereum/ethereum.service.spec.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { EthereumService } from './ethereum.service'; - -describe('EthereumService', () => { - let service: EthereumService; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - providers: [EthereumService], - }).compile(); - - service = module.get(EthereumService); - }); - - it('should be defined', () => { - expect(service).toBeDefined(); - }); -}); diff --git a/app/apps/monitor-service/src/monitor-service.module.ts b/app/apps/monitor-service/src/monitor-service.module.ts index 946cdee..5664284 100644 --- a/app/apps/monitor-service/src/monitor-service.module.ts +++ b/app/apps/monitor-service/src/monitor-service.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { EthereumModule } from './ethereum/ethereum.module'; import { MonitorServiceController } from './monitor-service.controller'; +import { PolygonModule } from './polygon/polygon.module'; @Module({ imports: [ @@ -11,6 +12,7 @@ import { MonitorServiceController } from './monitor-service.controller'; expandVariables: true, }), EthereumModule, + PolygonModule, ], controllers: [MonitorServiceController], providers: [], diff --git a/app/apps/monitor-service/src/polygon/polygon.controller.ts b/app/apps/monitor-service/src/polygon/polygon.controller.ts new file mode 100644 index 0000000..a7d45b0 --- /dev/null +++ b/app/apps/monitor-service/src/polygon/polygon.controller.ts @@ -0,0 +1,24 @@ +import { TopicName } from '@app/utils/topicUtils'; +import { Controller } from '@nestjs/common'; +import { EventPattern } from '@nestjs/microservices'; +import { PolygonService } from './polygon.service'; + +@Controller() +export class PolygonController { + constructor(private readonly polygon: PolygonService) {} + + @EventPattern(TopicName.POLYGON_NATIVE_TRANSFER) + handleNativeTransfer(data: any): void { + this.polygon.handleNativeTransfer(data); + } + + @EventPattern(TopicName.POLYGON_ERC20_TRANSFER) + handleErc20Transfer(data: any): void { + this.polygon.handleErc20Transfer(data); + } + + @EventPattern(TopicName.POLYGON_ERC721_TRANSFER) + handleErc721Transfer(data: any): void { + this.polygon.handleErc721Transfer(data); + } +} diff --git a/app/apps/monitor-service/src/polygon/polygon.module.ts b/app/apps/monitor-service/src/polygon/polygon.module.ts new file mode 100644 index 0000000..574f82c --- /dev/null +++ b/app/apps/monitor-service/src/polygon/polygon.module.ts @@ -0,0 +1,38 @@ +import { EventHistoryModelModule } from '@app/shared_modules/event_history/event_history.module'; +import { MonitorModule } from '@app/shared_modules/monitor/monitor.module'; +import { ProjectModule } from '@app/shared_modules/project/project.module'; +import { WebhookModule } from '@app/shared_modules/webhook/webhook.module'; +import { Module } from '@nestjs/common'; +import { ClientsModule, Transport } from '@nestjs/microservices'; +import { PolygonController } from './polygon.controller'; +import { PolygonService } from './polygon.service'; + +@Module({ + providers: [PolygonService], + controllers: [PolygonController], + exports: [PolygonService], + imports: [ + ClientsModule.registerAsync([ + { + name: 'WEBHOOK_SERVICE', + useFactory: () => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'webhook', + brokers: process.env.KAFKA_BROKERS.split(','), + }, + consumer: { + groupId: 'webhook-consumer', + }, + }, + }), + }, + ]), + WebhookModule, + MonitorModule, + ProjectModule, + EventHistoryModelModule, + ], +}) +export class PolygonModule {} diff --git a/app/apps/monitor-service/src/polygon/polygon.service.ts b/app/apps/monitor-service/src/polygon/polygon.service.ts new file mode 100644 index 0000000..1b86ed6 --- /dev/null +++ b/app/apps/monitor-service/src/polygon/polygon.service.ts @@ -0,0 +1,381 @@ +import { PolygonEventHistoryRepository } from '@app/shared_modules/event_history/repositories/event_history.repository'; +import { + EventHistory, + WebhookType, +} from '@app/shared_modules/event_history/schemas/event_history.schema'; +import { PolygonMonitorAddressRepository } from '@app/shared_modules/monitor/repositories/monitor.address.repository'; +import { MonitorRepository } from '@app/shared_modules/monitor/repositories/monitor.repository'; +import { MonitorAddress } from '@app/shared_modules/monitor/schemas/monitor.address.schema'; +import { + Monitor, + MonitoringType, + WebhookNotification, +} from '@app/shared_modules/monitor/schemas/monitor.schema'; +import { ProjectQuotaService } from '@app/shared_modules/project/services/project.quota.service'; +import { + DispatchWebhookResponse, + WebhookService, +} from '@app/shared_modules/webhook/webhook.service'; +import { SupportedChain } from '@app/utils/supportedChain.util'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { ethers, Log, TransactionResponse } from 'ethers'; + +@Injectable() +export class PolygonService { + private readonly logger = new Logger(PolygonService.name); + + @Inject() + private readonly monitorAddressRepository: PolygonMonitorAddressRepository; + + @Inject() + private readonly monitorRepository: MonitorRepository; + + @Inject('WEBHOOK_SERVICE') + private readonly webhookClient: ClientKafka; + + @Inject() + private readonly webhookService: WebhookService; + + @Inject() + private readonly projectQuotaService: ProjectQuotaService; + + @Inject() + private readonly eventHistoryRepository: PolygonEventHistoryRepository; + + async findEthAddress(address: string): Promise { + return this.monitorAddressRepository.findByAddress(address); + } + + async findMonitor(monitorId: string): Promise { + return this.monitorRepository.findById(monitorId); + } + + async handleErc20Transfer(data: any): Promise { + const event = data.event as Log; + const confirm = data.confirm as boolean; + this.logger.debug([ + 'ERC20', + `received transaction ${event.transactionHash} from block ${event.blockNumber}`, + ]); + // Extract relevant information from the event + // const contractAddress = ethers.getAddress(event.address).toLowerCase(); + const fromAddress = ethers + .getAddress(event.topics[1].substring(26)) + .toLowerCase(); + const toAddress = ethers + .getAddress(event.topics[2].substring(26)) + .toLowerCase(); + const value = ethers.toBigInt(event.data).toString(); + + // handle from wallet + const fromWallet_monitors = await this.findEthAddress(fromAddress); + if (fromWallet_monitors) { + this.handleMatchConditionERC20( + fromWallet_monitors, + confirm, + event, + value, + WebhookType.out, + ); + } + + // handle to wallet + const toWallet_monitors = await this.findEthAddress(toAddress); + if (toWallet_monitors) { + this.handleMatchConditionERC20( + toWallet_monitors, + confirm, + event, + value, + WebhookType.in, + ); + } + } + + async handleErc721Transfer(data: any) { + const event = data.event as Log; + const confirm = data.confirm as boolean; + + this.logger.debug([ + 'ERC721', + `received transaction ${event.transactionHash} from block ${event.blockNumber}`, + ]); + + const fromAddress = ethers + .getAddress(event.topics[1].substring(26)) + .toLowerCase(); + const toAddress = ethers + .getAddress(event.topics[2].substring(26)) + .toLowerCase(); + const tokenId = ethers.toBigInt(event.topics[3]).toString(); + + // handle from wallet + const fromWallet_monitors = await this.findEthAddress(fromAddress); + if (fromWallet_monitors) { + this.handleMatchConditionERC721( + fromWallet_monitors, + confirm, + event, + tokenId, + WebhookType.out, + ); + } + + // handle to wallet + const toWallet_monitors = await this.findEthAddress(toAddress); + if (toWallet_monitors) { + this.handleMatchConditionERC721( + toWallet_monitors, + confirm, + event, + tokenId, + WebhookType.in, + ); + } + } + + async handleNativeTransfer(data: any): Promise { + const transaction = data.transaction as TransactionResponse; + const confirm = data.confirm as boolean; + + this.logger.debug([ + 'NATIVE', + `receive new transaction ${transaction.hash} from block ${transaction.blockNumber}`, + ]); + + // return if value is zero + if (transaction.value == 0n) { + return; + } + + const fromWallet_monitors = await this.findEthAddress( + transaction.from.toLowerCase(), + ); + if (fromWallet_monitors) { + this.handleMatchConditionNative( + fromWallet_monitors, + confirm, + transaction, + WebhookType.out, + ); + } + // return on to address is null. this is transaction create contract + if (!transaction.to) { + return; + } + const toWallet_monitors = await this.findEthAddress( + transaction.to.toLowerCase(), + ); + if (toWallet_monitors) { + this.handleMatchConditionNative( + toWallet_monitors, + confirm, + transaction, + WebhookType.in, + ); + } + } + + private async handleMatchConditionNative( + addresses: MonitorAddress[], + confirm: boolean, + transaction: TransactionResponse, + type: WebhookType, + ) { + // @todo check condition of monitor and event log if it match + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + if (!monitor.condition.native) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + + const txnHistory = EventHistory.fromTransactionToNative( + transaction, + SupportedChain.POLYGON.name, + monitor.monitorId, + type, + confirm, + ); + + const response = await this.dispatchMessageToWebhook(monitor, txnHistory); + this.saveHistory(txnHistory, response); + + this.logger.debug( + `Confirmed: ${confirm} native transfer:\n${JSON.stringify(txnHistory)}`, + ); + } + } + + private async handleMatchConditionERC721( + addresses: MonitorAddress[], + confirm: boolean, + event: Log, + tokenId: string, + type: WebhookType, + ) { + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + // ignore monitor condition on erc721 + if (!monitor.condition.erc721) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + // @todo check condition on specific cryptos + const transaction = EventHistory.fromLogToERC721( + event, + SupportedChain.POLYGON.name, + monitor.monitorId, + type, + confirm, + tokenId, + ); + + const response = await this.dispatchMessageToWebhook( + monitor, + transaction, + ); + this.saveHistory(transaction, response); + + this.logger.debug( + `Confirmed: ${confirm} ERC721 transfer ${type.toUpperCase()}:\n${JSON.stringify( + transaction, + )}`, + ); + } + } + + private async handleMatchConditionERC20( + addresses: MonitorAddress[], + confirm: boolean, + event: Log, + value: string, + type: WebhookType, + ) { + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + // ignore monitor condition on erc20 + if (!monitor.condition.erc20) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + // @todo check condition on specific cryptos + const txnHistory = EventHistory.fromLogToERC20( + event, + SupportedChain.POLYGON.name, + monitor.monitorId, + type, + confirm, + value, + ); + + const response = await this.dispatchMessageToWebhook(monitor, txnHistory); + await this.saveHistory(txnHistory, response); + + this.logger.debug( + `Confirmed: ${confirm} ERC20 transfer ${type.toUpperCase()}:\n${JSON.stringify( + txnHistory, + )}`, + ); + } + } + + private async saveHistory( + event: EventHistory, + delivery: DispatchWebhookResponse, + ) { + let deliveryId: string; + if (!delivery) { + this.logger.error( + `Save event ${event.confirm ? 'CONFIRMED' : 'DETECT'} ${ + event.eventId + } with error can not dispatch this event`, + ); + deliveryId = 'ERROR'; + } else { + deliveryId = delivery.id; + } + + if (!event.confirm) { + event.deliveryIds = [deliveryId]; + await this.eventHistoryRepository.saveEventHistory(event); + } else { + await this.eventHistoryRepository.pushConfirmDeliveryId( + event.eventId, + deliveryId, + ); + } + } + + private async sendMessage(monitor: Monitor, body: EventHistory) { + if (!monitor.notification) { + return; + } + const webhook = monitor.notification as WebhookNotification; + body.tags = monitor.tags; + try { + const response = await fetch(webhook.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: webhook.authorization, + }, + body: JSON.stringify(body), + }); + if (!response.ok) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + response, + ); + } + } catch (error) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + error, + ); + } + } + + private async dispatchMessageToWebhook( + monitor: Monitor, + body: EventHistory, + ): Promise { + if (!monitor.notification) { + return; + } + const webhook = monitor.notification as WebhookNotification; + body.tags = monitor.tags; + try { + const respone = await this.webhookService.dispatchMessage( + monitor.webhookId, + body, + ); + this.logger.debug( + `Dispatch webhook successfully response: ${JSON.stringify(respone)}`, + ); + await this.projectQuotaService.increaseUsed(monitor.projectId); + return respone; + } catch (error) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + error, + ); + } + } +} diff --git a/app/apps/onebox/src/main.module.ts b/app/apps/onebox/src/main.module.ts index 072234c..209f9d2 100644 --- a/app/apps/onebox/src/main.module.ts +++ b/app/apps/onebox/src/main.module.ts @@ -13,6 +13,7 @@ import { UsersModule } from './modules/users/users.module'; import { PollingBlockService } from './polling.block/polling.block.service'; import { DeliveryModule } from './modules/delivery/delivery.module'; import { EventHistoryModule } from './modules/event_history/event_history.module'; +import { PolygonPollingBlockService } from './polling.block/polygon.polling.block.service'; @Module({ imports: [ @@ -49,6 +50,6 @@ import { EventHistoryModule } from './modules/event_history/event_history.module DeliveryModule, EventHistoryModule, ], - providers: [GlobalService, PollingBlockService], + providers: [GlobalService, PollingBlockService, PolygonPollingBlockService], }) export class MainModule {} diff --git a/app/apps/onebox/src/modules/blocksync/blocksync.service.ts b/app/apps/onebox/src/modules/blocksync/blocksync.service.ts index 05860f1..dcec9f8 100644 --- a/app/apps/onebox/src/modules/blocksync/blocksync.service.ts +++ b/app/apps/onebox/src/modules/blocksync/blocksync.service.ts @@ -37,6 +37,10 @@ export class BlockSyncService { return this.blockSync.findOne({ rpcUrl: rpcUrl }); } + async findByChain(chain: string): Promise { + return this.blockSync.find({ chain: chain }); + } + async deleteOne(rpcUrl: string): Promise { try { await this.blockSync.deleteOne({ rpcUrl: rpcUrl }); diff --git a/app/apps/onebox/src/modules/blocksync/dto/create-blocksync.dto.ts b/app/apps/onebox/src/modules/blocksync/dto/create-blocksync.dto.ts index 46ad805..04def33 100644 --- a/app/apps/onebox/src/modules/blocksync/dto/create-blocksync.dto.ts +++ b/app/apps/onebox/src/modules/blocksync/dto/create-blocksync.dto.ts @@ -13,6 +13,8 @@ export class CreateBlockSyncDto { @IsLowercase() rpcUrl: string; + chain: string; + @IsString() @IsNotEmpty() lastSync: number; diff --git a/app/apps/onebox/src/polling.block/polling.block.service.spec.ts b/app/apps/onebox/src/polling.block/polling.block.service.spec.ts deleted file mode 100644 index cfb4684..0000000 --- a/app/apps/onebox/src/polling.block/polling.block.service.spec.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { PollingBlockService } from './polling.block.service'; - -describe('PollingBlockService', () => { - let service: PollingBlockService; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - providers: [PollingBlockService], - }).compile(); - - service = module.get(PollingBlockService); - }); - - it('should be defined', () => { - expect(service).toBeDefined(); - }); -}); diff --git a/app/apps/onebox/src/polling.block/polling.block.service.ts b/app/apps/onebox/src/polling.block/polling.block.service.ts index 38205d4..f7de75e 100644 --- a/app/apps/onebox/src/polling.block/polling.block.service.ts +++ b/app/apps/onebox/src/polling.block/polling.block.service.ts @@ -40,6 +40,7 @@ export class PollingBlockService { if (!blockSync) { blockSync = await this.blockSyncService.create({ rpcUrl: this.rpcUrl, + chain: SupportedChain.ETH.name, lastSync: parseInt(process.env.EVM_START_BLOCK), }); } diff --git a/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts b/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts new file mode 100644 index 0000000..1d97803 --- /dev/null +++ b/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts @@ -0,0 +1,175 @@ +import { TopicName } from '@app/utils/topicUtils'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { CronExpression, SchedulerRegistry } from '@nestjs/schedule'; +import { CronJob } from 'cron'; +import { ethers } from 'ethers'; +import { BlockSyncService } from '../modules/blocksync/blocksync.service'; +import { SupportedChain } from '@app/utils/supportedChain.util'; + +@Injectable() +export class PolygonPollingBlockService { + private detectInfo = { flag: false, blockNumber: 0 }; + + private readonly logger = new Logger(PolygonPollingBlockService.name); + + constructor( + private schedulerRegistry: SchedulerRegistry, + private readonly blockSyncService: BlockSyncService, + @Inject('WORKER_CLIENT_SERVICE') + private readonly workerClient: ClientKafka, + ) {} + + rpcUrl: string; + provider: ethers.Provider; + + onModuleInit() { + this.logger.log(`The module has been initialized.`); + if (process.env.POLYGON_DISABLE === 'true') { + this.detectInfo.flag = true; + return; + } + this.rpcUrl = process.env.POLYGON_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider( + process.env.POLYGON_PROVIDER_URL, + ); + this.init(); + } + + async init() { + this.detectInfo.flag = true; + let blockSync = await this.blockSyncService.findOne(this.rpcUrl); + if (!blockSync) { + blockSync = await this.blockSyncService.create({ + rpcUrl: this.rpcUrl, + chain: SupportedChain.POLYGON.name, + lastSync: parseInt(process.env.POLYGON_START_BLOCK), + }); + } + // checking force latest block config + const startBlockConfig = process.env.POLYGON_START_BLOCK_CONFIG; + if (startBlockConfig === 'latest') { + const latestBlockNumber = await this.provider.getBlockNumber(); + this.logger.warn( + 'force running latest block from network ' + latestBlockNumber, + ); + this.updateLastSyncBlock(latestBlockNumber); + // if start at latest block, we need to minus 1 + // we suppose that we already scan at (latest block - 1) + this.detectInfo.blockNumber = latestBlockNumber - 1; + } else if (startBlockConfig === 'config') { + this.logger.warn( + 'force running start block from config ' + + process.env.POLYGON_START_BLOCK, + ); + this.updateLastSyncBlock(parseInt(process.env.POLYGON_START_BLOCK)); + // if we start at config block, we suppose that we already scan at (config block - 1) + this.detectInfo.blockNumber = + parseInt(process.env.POLYGON_START_BLOCK) - 1; + } else { + this.logger.warn('running start block from db ' + blockSync.lastSync); + // if we start at db block, we suppose that we already scan at db block + this.detectInfo.blockNumber = + blockSync.lastSync + SupportedChain.POLYGON.confirmationBlock; + } + this.detectInfo.flag = false; + this.addCronJob('polygonPollingBlock', '5'); + } + + addCronJob(name: string, seconds: string) { + const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => + this.polygonPollingBlock(), + ); + + this.schedulerRegistry.addCronJob(name, job); + job.start(); + + this.logger.warn(`job ${name} added for each ${seconds} seconds!`); + } + + private async updateLastSyncBlock(blockNumber: number): Promise { + // Update the last sync block in MongoDB + await this.blockSyncService.updateLastSync(this.rpcUrl, blockNumber); + } + + async polygonPollingBlock() { + this.logger.debug('Polygon Start polling block number'); + if (this.detectInfo.flag) { + this.logger.error('conflict with last job. quit current job'); + return; + } + this.detectInfo.flag = true; + const lastDetectedBlock = this.detectInfo.blockNumber + 1; + + // Get the latest block number + const latestDetectedBlockNumber = await this.getBlockNumber(); + + // Scan each block + for ( + let blockNumber = lastDetectedBlock; + blockNumber <= latestDetectedBlockNumber; + blockNumber++ + ) { + try { + this.logger.log(`last emitted block ${blockNumber}`); + + // emit event detect block with blocknumber + this.logger.debug(['DETECT', `send block ${blockNumber}`]); + this.workerClient.emit(TopicName.POLYGON_DETECTED_BLOCK, { + blockNumber: blockNumber, + }); + + this.logger.debug([ + 'CONFIRM', + `send block ${ + blockNumber - SupportedChain.POLYGON.confirmationBlock + }`, + ]); + // emit event confirm block with block number - confirm block + this.workerClient.emit(TopicName.POLYGON_CONFIRMED_BLOCK, { + blockNumber: blockNumber - SupportedChain.POLYGON.confirmationBlock, + }); + + this.detectInfo.blockNumber = blockNumber; + + //only update last sync for confirm + await this.updateLastSyncBlock( + blockNumber - SupportedChain.POLYGON.confirmationBlock, + ); + } catch (error) { + this.logger.error([ + 'DETECT', + `Error scanning block ${blockNumber}:`, + error, + ]); + break; + } + } + + this.detectInfo.flag = false; + return; + } + + private async getBlockNumber(): Promise { + try { + // Perform an asynchronous operation (e.g., fetching data) + const blockNumber = await Promise.race([ + this.provider.getBlockNumber(), // Your asynchronous operation + delay(5000).then(() => { + throw new Error('Get block number Timeout'); + }), // Timeout promise + ]); + this.logger.log('got latest block from network: ' + blockNumber); + return blockNumber; + } catch (error) { + this.logger.error('error while getting block number', error); + } + return 0; + } +} + +function delay(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} diff --git a/app/apps/worker-service/src/worker-service.controller.ts b/app/apps/worker-service/src/worker-service.controller.ts index ac17e33..5d3b288 100644 --- a/app/apps/worker-service/src/worker-service.controller.ts +++ b/app/apps/worker-service/src/worker-service.controller.ts @@ -3,12 +3,14 @@ import { WorkerServiceService } from './worker-service.service'; import { EventPattern } from '@nestjs/microservices'; import { TopicName } from '@app/utils/topicUtils'; import { EthereumWorker } from './worker/ethereum.worker'; +import { PolygonWorker } from './worker/polygon.worker'; @Controller() export class WorkerServiceController { constructor( private readonly workerServiceService: WorkerServiceService, private readonly ethereumWorker: EthereumWorker, + private readonly polygonWorker: PolygonWorker, ) {} @Get() @@ -23,6 +25,16 @@ export class WorkerServiceController { @EventPattern(TopicName.ETH_CONFIRMED_BLOCK) async ethConfirmBlock(data: any) { - this.ethereumWorker.ethHandleConfirmedBlock(data); + this.polygonWorker.ethHandleConfirmedBlock(data); + } + + @EventPattern(TopicName.POLYGON_DETECTED_BLOCK) + async polygonDetectBlock(data: any) { + this.polygonWorker.ethHandleDetectedBlock(data); + } + + @EventPattern(TopicName.POLYGON_CONFIRMED_BLOCK) + async polygonConfirmBlock(data: any) { + this.polygonWorker.ethHandleConfirmedBlock(data); } } diff --git a/app/apps/worker-service/src/worker-service.module.ts b/app/apps/worker-service/src/worker-service.module.ts index 85a53a2..b9300df 100644 --- a/app/apps/worker-service/src/worker-service.module.ts +++ b/app/apps/worker-service/src/worker-service.module.ts @@ -5,6 +5,7 @@ import { ScheduleModule } from '@nestjs/schedule'; import { WorkerServiceController } from './worker-service.controller'; import { EthereumWorker } from './worker/ethereum.worker'; import { WorkerServiceService } from './worker-service.service'; +import { PolygonWorker } from './worker/polygon.worker'; @Module({ imports: [ @@ -32,6 +33,6 @@ import { WorkerServiceService } from './worker-service.service'; ScheduleModule.forRoot(), ], controllers: [WorkerServiceController], - providers: [WorkerServiceService, EthereumWorker], + providers: [WorkerServiceService, EthereumWorker, PolygonWorker], }) export class WorkerServiceModule {} diff --git a/app/apps/worker-service/src/worker/ethereum.worker.ts b/app/apps/worker-service/src/worker/ethereum.worker.ts index 82dc221..4d350b5 100644 --- a/app/apps/worker-service/src/worker/ethereum.worker.ts +++ b/app/apps/worker-service/src/worker/ethereum.worker.ts @@ -36,9 +36,9 @@ export class EthereumWorker { }); // handle native transfer - this.emitNativeTransaction(block, false); + await this.emitNativeTransaction(block, false); // handle extracted event for erc20 and nft - this.emitLog(logs, false); + await this.emitLog(logs, false); //only update last sync for confirm // await this.updateLastSyncBlock(blockNumber); } catch (error) { diff --git a/app/apps/worker-service/src/worker/polygon.worker.ts b/app/apps/worker-service/src/worker/polygon.worker.ts new file mode 100644 index 0000000..ed8cc86 --- /dev/null +++ b/app/apps/worker-service/src/worker/polygon.worker.ts @@ -0,0 +1,118 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { Block, ethers, Log } from 'ethers'; +import { TopicName } from '@app/utils/topicUtils'; + +@Injectable() +export class PolygonWorker { + private readonly logger = new Logger(PolygonWorker.name); + rpcUrl: string; + provider: ethers.Provider; + + constructor( + @Inject('MONITOR_CLIENT_SERVICE') private monitorClient: ClientKafka, + ) { + if (process.env.POLYGON_PROVIDER_URL) { + this.rpcUrl = process.env.POLYGON_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider( + process.env.POLYGON_PROVIDER_URL, + ); + } + } + + async ethHandleDetectedBlock(data: { blockNumber: number }) { + const blockNumber = data.blockNumber; + + try { + this.logger.log(`DETECT handle block ${blockNumber}`); + // Retrieve all transaction in block + const block = await this.provider.getBlock(blockNumber, true); + + // Retrieve transfer event the block's logs + const logs = await this.provider.getLogs({ + fromBlock: blockNumber, + toBlock: blockNumber, + topics: [ + '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', + ], + }); + + // handle native transfer + await this.emitNativeTransaction(block, false); + // handle extracted event for erc20 and nft + await this.emitLog(logs, false); + //only update last sync for confirm + // await this.updateLastSyncBlock(blockNumber); + } catch (error) { + // @todo re-add error block into kafka, and save in db + this.logger.error([ + 'DETECT', + `Error scanning block ${blockNumber}:`, + error, + ]); + } + + return; + } + + async ethHandleConfirmedBlock(data: { blockNumber: number }) { + const blockNumber = data.blockNumber; + try { + this.logger.log(`CONFIRM Scanning block ${blockNumber}`); + // Retrieve all transaction in block + const block = await this.provider.getBlock(blockNumber, true); + // Retrieve transfer event the block's logs + const logs = await this.provider.getLogs({ + fromBlock: blockNumber, + toBlock: blockNumber, + topics: [ + '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', + ], + }); + // handle native transfer + this.emitNativeTransaction(block, true); + // handle extracted event for erc20 and nft + this.emitLog(logs, true); + } catch (error) { + this.logger.error([ + 'CONFIRM', + `Error scanning block ${blockNumber}:`, + error, + ]); + } + return; + } + + private async emitLog(logs: Log[], confirm: boolean): Promise { + // handle extracted event for erc20 and nft + logs.forEach((event) => { + if (event.topics.length === 3) { + this.logger.debug(`emit event on ERC20 ${JSON.stringify(event)}`); + this.monitorClient.emit(TopicName.POLYGON_ERC20_TRANSFER, { + event: event, + confirm: confirm, + }); + } else if (event.topics.length === 4) { + this.logger.debug(`emit event on ERC721 ${JSON.stringify(event)}`); + this.monitorClient.emit(TopicName.POLYGON_ERC721_TRANSFER, { + event: event, + confirm: confirm, + }); + } + }); + } + + private async emitNativeTransaction( + block: Block, + confirm: boolean, + ): Promise { + // handle extracted event for native + block.prefetchedTransactions.forEach((transaction) => { + this.logger.debug(`emit event on NATIVE ${JSON.stringify(transaction)}`); + this.monitorClient.emit(TopicName.POLYGON_NATIVE_TRANSFER, { + transaction: transaction, + confirm: confirm, + }); + }); + } +} diff --git a/app/libs/shared_modules/src/event_history/event_history.module.ts b/app/libs/shared_modules/src/event_history/event_history.module.ts index 6a17d20..aaa51f4 100644 --- a/app/libs/shared_modules/src/event_history/event_history.module.ts +++ b/app/libs/shared_modules/src/event_history/event_history.module.ts @@ -3,6 +3,7 @@ import { Module } from '@nestjs/common'; import { BscEventHistoryRepository, EthEventHistoryRepository, + PolygonEventHistoryRepository, } from './repositories/event_history.repository'; import { EventHistoryProviders } from './event_history.provider'; @@ -12,11 +13,13 @@ import { EventHistoryProviders } from './event_history.provider'; ...EventHistoryProviders, EthEventHistoryRepository, BscEventHistoryRepository, + PolygonEventHistoryRepository, ], exports: [ ...EventHistoryProviders, EthEventHistoryRepository, BscEventHistoryRepository, + PolygonEventHistoryRepository, ], }) export class EventHistoryModelModule {} diff --git a/app/libs/shared_modules/src/event_history/event_history.provider.ts b/app/libs/shared_modules/src/event_history/event_history.provider.ts index c7176ea..37edb84 100644 --- a/app/libs/shared_modules/src/event_history/event_history.provider.ts +++ b/app/libs/shared_modules/src/event_history/event_history.provider.ts @@ -22,4 +22,14 @@ export const EventHistoryProviders = [ ), inject: ['DATABASE_CONNECTION'], }, + { + provide: 'POLYGON_EVENT_HISTORY_MODEL', + useFactory: (connection: Connection) => + connection.model( + 'PolygonEventHistory', + EventHistorySchema, + 'polygon_event_history', + ), + inject: ['DATABASE_CONNECTION'], + }, ]; diff --git a/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts b/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts index c595713..da1d83b 100644 --- a/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts +++ b/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts @@ -82,3 +82,12 @@ export class BscEventHistoryRepository extends EventHistoryRepository { super(MonitorNetwork.BSC, model); } } + +@Injectable() +export class PolygonEventHistoryRepository extends EventHistoryRepository { + constructor( + @Inject('POLYGON_EVENT_HISTORY_MODEL') model: Model, + ) { + super(MonitorNetwork.Polygon, model); + } +} diff --git a/app/libs/shared_modules/src/monitor/monitor.module.ts b/app/libs/shared_modules/src/monitor/monitor.module.ts index 6cc00a5..a1f0d08 100644 --- a/app/libs/shared_modules/src/monitor/monitor.module.ts +++ b/app/libs/shared_modules/src/monitor/monitor.module.ts @@ -4,6 +4,7 @@ import { MonitorProviders } from './monitor.provider'; import { BscMonitorAddressRepository, EthMonitorAddressRepository, + PolygonMonitorAddressRepository, } from './repositories/monitor.address.repository'; import { MonitorRepository } from './repositories/monitor.repository'; import { MonitorWebhookService } from './services/monitor.webhook.service'; @@ -16,6 +17,7 @@ import { MonitorWebhookService } from './services/monitor.webhook.service'; MonitorRepository, EthMonitorAddressRepository, BscMonitorAddressRepository, + PolygonMonitorAddressRepository, ], exports: [ ...MonitorProviders, @@ -23,6 +25,7 @@ import { MonitorWebhookService } from './services/monitor.webhook.service'; MonitorRepository, EthMonitorAddressRepository, BscMonitorAddressRepository, + PolygonMonitorAddressRepository, ], }) export class MonitorModule {} diff --git a/app/libs/shared_modules/src/monitor/monitor.provider.ts b/app/libs/shared_modules/src/monitor/monitor.provider.ts index ab860aa..24585c9 100644 --- a/app/libs/shared_modules/src/monitor/monitor.provider.ts +++ b/app/libs/shared_modules/src/monitor/monitor.provider.ts @@ -29,4 +29,14 @@ export const MonitorProviders = [ ), inject: ['DATABASE_CONNECTION'], }, + { + provide: 'POLYGON_MONITOR_ADDRESS_MODEL', + useFactory: (connection: Connection) => + connection.model( + 'PolygonMonitorAddress', + MonitorAddressSchema, + 'polygon_monitor_address', + ), + inject: ['DATABASE_CONNECTION'], + }, ]; diff --git a/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts b/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts index c61ff78..ddae271 100644 --- a/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts +++ b/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts @@ -110,3 +110,12 @@ export class BscMonitorAddressRepository extends MonitorAddressRepository { super(MonitorNetwork.BSC, model); } } + +@Injectable() +export class PolygonMonitorAddressRepository extends MonitorAddressRepository { + constructor( + @Inject('POLYGON_MONITOR_ADDRESS_MODEL') model: Model, + ) { + super(MonitorNetwork.Polygon, model); + } +} diff --git a/app/libs/shared_modules/src/webhook/webhook.service.ts b/app/libs/shared_modules/src/webhook/webhook.service.ts index 0086376..4da01b9 100644 --- a/app/libs/shared_modules/src/webhook/webhook.service.ts +++ b/app/libs/shared_modules/src/webhook/webhook.service.ts @@ -209,7 +209,9 @@ export class WebhookService { ); throw new Error('get deliveries'); } - this.logger.debug(`send webhook success with webhookId: ${webhookId}`); + this.logger.debug( + `Retrieve webhook deliveries with webhookId: ${webhookId}`, + ); return (await response.json()) as DeliveriesResponseDto; } catch (e) { this.logger.error( diff --git a/app/libs/utils/src/topicUtils.ts b/app/libs/utils/src/topicUtils.ts index b87a40a..6119df8 100644 --- a/app/libs/utils/src/topicUtils.ts +++ b/app/libs/utils/src/topicUtils.ts @@ -4,4 +4,10 @@ export enum TopicName { ETH_NATIVE_TRANSFER = 'eth-native-transfer', ETH_ERC20_TRANSFER = 'eth-erc20-transfer', ETH_ERC721_TRANSFER = 'eth-erc721-transfer', + + POLYGON_DETECTED_BLOCK = 'polygon-detect-block', + POLYGON_CONFIRMED_BLOCK = 'polygon-confirm-block', + POLYGON_NATIVE_TRANSFER = 'polygon-native-transfer', + POLYGON_ERC20_TRANSFER = 'polygon-erc20-transfer', + POLYGON_ERC721_TRANSFER = 'polygon-erc721-transfer', }