Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
cauta committed Apr 15, 2024
1 parent 0f24f61 commit 894d5e3
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 350 deletions.
3 changes: 1 addition & 2 deletions app/apps/onebox/src/polling.block/polling.block.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { TopicName } from '@app/utils/topicUtils';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { EthereumWorker } from 'apps/worker-service/src/worker/evm.worker';
import { CronJob } from 'cron';
import { ethers } from 'ethers';
import { BlockSyncService } from '../modules/blocksync/blocksync.service';
Expand All @@ -12,7 +11,7 @@ import { SupportedChain } from '@app/utils/supportedChain.util';
export class PollingBlockService {
private detectInfo = { flag: false, blockNumber: 0 };

private readonly logger = new Logger(EthereumWorker.name);
private readonly logger = new Logger(PollingBlockService.name);

constructor(
private schedulerRegistry: SchedulerRegistry,
Expand Down
10 changes: 7 additions & 3 deletions app/apps/worker-service/src/worker-service.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ import { Controller, Get } from '@nestjs/common';
import { WorkerServiceService } from './worker-service.service';
import { EventPattern } from '@nestjs/microservices';
import { TopicName } from '@app/utils/topicUtils';
import { EthereumWorker } from './worker/ethereum.worker';

@Controller()
export class WorkerServiceController {
constructor(private readonly workerServiceService: WorkerServiceService) {}
constructor(
private readonly workerServiceService: WorkerServiceService,
private readonly ethereumWorker: EthereumWorker,
) {}

@Get()
getHello(): string {
Expand All @@ -14,11 +18,11 @@ export class WorkerServiceController {

@EventPattern(TopicName.ETH_DETECTED_BLOCK)
async ethDetectBlock(data: any) {
this.workerServiceService.ethHandleDetectedBlock(data);
this.ethereumWorker.ethHandleDetectedBlock(data);
}

@EventPattern(TopicName.ETH_CONFIRMED_BLOCK)
async ethConfirmBlock(data: any) {
this.workerServiceService.ethHandleConfirmedBlock(data);
this.ethereumWorker.ethHandleConfirmedBlock(data);
}
}
6 changes: 2 additions & 4 deletions app/apps/worker-service/src/worker-service.module.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { DatabaseModule } from '@app/database';
import { GlobalModule } from '@app/global';
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ScheduleModule } from '@nestjs/schedule';
import { BlockSyncModule } from './blocksync/blocksync.module';
import { WorkerServiceController } from './worker-service.controller';
import { EthereumWorker } from './worker/ethereum.worker';
import { WorkerServiceService } from './worker-service.service';
import { EthereumWorker } from './worker/evm.worker';

@Module({
imports: [
Expand Down Expand Up @@ -36,6 +34,6 @@ import { EthereumWorker } from './worker/evm.worker';
ScheduleModule.forRoot(),
],
controllers: [WorkerServiceController],
providers: [WorkerServiceService],
providers: [WorkerServiceService, EthereumWorker],
})
export class WorkerServiceModule {}
106 changes: 2 additions & 104 deletions app/apps/worker-service/src/worker-service.service.ts
Original file line number Diff line number Diff line change
@@ -1,117 +1,15 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ethers } from 'ethers';
import { BlockSyncService } from './blocksync/blocksync.service';
import { TopicName } from '@app/utils/topicUtils';

@Injectable()
export class WorkerServiceService {
private readonly logger = new Logger(WorkerServiceService.name);
rpcUrl: string;
provider: ethers.Provider;

constructor(
@Inject('MONITOR_CLIENT_SERVICE') private monitorClient: ClientKafka,
private readonly blockSyncService: BlockSyncService,
) {
if (process.env.ETH_PROVIDER_URL) {
this.rpcUrl = process.env.ETH_PROVIDER_URL;
this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL);
}
}
) {}

getHello(): string {
console.log('Emit blockchain-event to kafka');
this.monitorClient.emit('blockchain-event', { userId: 'abc' });
return 'Hello World!';
}

async ethHandleDetectedBlock(data: { blockNumber: number }) {
const blockNumber = data.blockNumber;

try {
this.logger.debug(['DETECT', `handle block ${blockNumber}`]);
// handle native transfer
this.handleNativeTransfer(blockNumber, false);
// handle extracted event for erc20 and nft
this.handleLog(blockNumber, false);
//only update last sync for confirm
// await this.updateLastSyncBlock(blockNumber);
} catch (error) {
// @todo re-add error block into kafka, and save in db
this.logger.error([
'DETECT',
`Error scanning block ${blockNumber}:`,
error,
]);
}

return;
}

async ethHandleConfirmedBlock(data: { blockNumber: number }) {
const blockNumber = data.blockNumber;
try {
this.logger.debug(['CONFIRM', `Scanning block ${blockNumber}`]);
// handle native transfer
this.handleNativeTransfer(blockNumber, true);
// handle extracted event for erc20 and nft
this.handleLog(blockNumber, true);
} catch (error) {
this.logger.error([
'CONFIRM',
`Error scanning block ${blockNumber}:`,
error,
]);
}
return;
}

private async handleLog(
blockNumber: number,
confirm: boolean,
): Promise<void> {
// Retrieve transfer event the block's logs
const logs = await this.provider.getLogs({
fromBlock: blockNumber,
toBlock: blockNumber,
topics: [
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
],
});

// handle extracted event for erc20 and nft
logs.forEach((event) => {
if (event.topics.length === 3) {
// this.ethMonitorService.handleErc20Transfer(event, confirm);
this.monitorClient.emit(TopicName.ETH_ERC20_TRANSFER, {
event: event,
confirm: confirm,
});
} else if (event.topics.length === 4) {
// this.ethMonitorService.handleErc721Transfer(event, confirm);
this.monitorClient.emit(TopicName.ETH_ERC721_TRANSFER, {
event: event,
confirm: confirm,
});
}
});
}

private async handleNativeTransfer(
blockNumber: number,
confirm: boolean,
): Promise<void> {
// Retrieve all transaction in block
const block = await this.provider.getBlock(blockNumber, true);

// handle extracted event for native
block.prefetchedTransactions.forEach((transaction) => {
// this.ethMonitorService.handleNativeTransfer(transaction, confirm);
this.monitorClient.emit(TopicName.ETH_NATIVE_TRANSFER, {
transaction: transaction,
confirm: confirm,
});
});
}
}
109 changes: 109 additions & 0 deletions app/apps/worker-service/src/worker/ethereum.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ethers } from 'ethers';
import { TopicName } from '@app/utils/topicUtils';

@Injectable()
export class EthereumWorker {
private readonly logger = new Logger(EthereumWorker.name);
rpcUrl: string;
provider: ethers.Provider;

constructor(
@Inject('MONITOR_CLIENT_SERVICE') private monitorClient: ClientKafka,
) {
if (process.env.ETH_PROVIDER_URL) {
this.rpcUrl = process.env.ETH_PROVIDER_URL;
this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL);
}
}

async ethHandleDetectedBlock(data: { blockNumber: number }) {
const blockNumber = data.blockNumber;

try {
this.logger.debug(['DETECT', `handle block ${blockNumber}`]);
// handle native transfer
this.handleNativeTransfer(blockNumber, false);
// handle extracted event for erc20 and nft
this.handleLog(blockNumber, false);
//only update last sync for confirm
// await this.updateLastSyncBlock(blockNumber);
} catch (error) {
// @todo re-add error block into kafka, and save in db
this.logger.error([
'DETECT',
`Error scanning block ${blockNumber}:`,
error,
]);
}

return;
}

async ethHandleConfirmedBlock(data: { blockNumber: number }) {
const blockNumber = data.blockNumber;
try {
this.logger.debug(['CONFIRM', `Scanning block ${blockNumber}`]);
// handle native transfer
this.handleNativeTransfer(blockNumber, true);
// handle extracted event for erc20 and nft
this.handleLog(blockNumber, true);
} catch (error) {
this.logger.error([
'CONFIRM',
`Error scanning block ${blockNumber}:`,
error,
]);
}
return;
}

private async handleLog(
blockNumber: number,
confirm: boolean,
): Promise<void> {
// Retrieve transfer event the block's logs
const logs = await this.provider.getLogs({
fromBlock: blockNumber,
toBlock: blockNumber,
topics: [
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
],
});

// handle extracted event for erc20 and nft
logs.forEach((event) => {
if (event.topics.length === 3) {
// this.ethMonitorService.handleErc20Transfer(event, confirm);
this.monitorClient.emit(TopicName.ETH_ERC20_TRANSFER, {
event: event,
confirm: confirm,
});
} else if (event.topics.length === 4) {
// this.ethMonitorService.handleErc721Transfer(event, confirm);
this.monitorClient.emit(TopicName.ETH_ERC721_TRANSFER, {
event: event,
confirm: confirm,
});
}
});
}

private async handleNativeTransfer(
blockNumber: number,
confirm: boolean,
): Promise<void> {
// Retrieve all transaction in block
const block = await this.provider.getBlock(blockNumber, true);

// handle extracted event for native
block.prefetchedTransactions.forEach((transaction) => {
// this.ethMonitorService.handleNativeTransfer(transaction, confirm);
this.monitorClient.emit(TopicName.ETH_NATIVE_TRANSFER, {
transaction: transaction,
confirm: confirm,
});
});
}
}
Loading

0 comments on commit 894d5e3

Please sign in to comment.