diff --git a/src/helpers/elastic.service.ts b/src/helpers/elastic.service.ts deleted file mode 100644 index 51f28fc11..000000000 --- a/src/helpers/elastic.service.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { Client } from '@elastic/elasticsearch'; -import { Search } from '@elastic/elasticsearch/api/requestParams'; -import { Inject, Injectable } from '@nestjs/common'; -import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; -import { generateComputeLogMessage } from 'src/utils/generate-log-message'; -import { Logger } from 'winston'; -import { ElasticQuery } from '@multiversx/sdk-nestjs-elastic'; - -@Injectable() -export class ElasticService { - private elasticClient: Client; - constructor( - @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, - ) { - this.elasticClient = new Client({ - node: process.env.ELASTICSEARCH_URL, - }); - } - - async getCount( - collection: string, - elasticQueryAdapter: ElasticQuery | undefined = undefined, - ) { - const query = elasticQueryAdapter?.toJson().query; - try { - const result: any = await this.elasticClient.count({ - index: collection, - body: { - query, - }, - }); - return result.body.count; - } catch (error) { - const logMessage = generateComputeLogMessage( - ElasticService.name, - this.getCount.name, - 'TX count', - error, - ); - this.logger.error(logMessage); - throw error; - } - } - - async scrollSearch(params: Search): Promise { - let response = await this.elasticClient.search(params); - const hits = []; - let scroll = 1; - while (true) { - const sourceHits = response.body.hits.hits; - - if (sourceHits.length === 0) { - break; - } - - hits.push(...sourceHits); - - if (!response.body._scroll_id) { - break; - } - - response = await this.elasticClient.scroll({ - scroll_id: response.body._scroll_id, - scroll: `${scroll}s`, - }); - scroll += 1; - } - return hits; - } - - async getList( - collection: string, - key: string, - elasticQueryAdapter: ElasticQuery, - ): Promise { - const query = elasticQueryAdapter.toJson().query; - try { - return await this.scrollSearch({ - index: collection, - size: 10000, - scroll: '5s', - _source: [key], - body: { - query, - }, - }); - } catch (error) { - const logMessage = generateComputeLogMessage( - ElasticService.name, - this.getList.name, - 'TX list', - error, - ); - this.logger.error(logMessage); - throw error; - } - } -} diff --git a/src/modules/governance/governance.module.ts b/src/modules/governance/governance.module.ts index 25aa133fb..23554e9da 100644 --- a/src/modules/governance/governance.module.ts +++ b/src/modules/governance/governance.module.ts @@ -28,7 +28,7 @@ import { GovernanceAbiFactory } from './services/governance.abi.factory'; import { GovernanceServiceFactory } from './services/governance.factory'; import { GovernanceOldEnergyAbiService } from './services/governance.old.energy.abi.service'; import { LockedAssetModule } from '../locked-asset-factory/locked-asset.module'; -import { ElasticService } from 'src/helpers/elastic.service'; +import { ElasticSearchModule } from 'src/services/elastic-search/elastic.search.module'; @Module({ imports: [ @@ -38,6 +38,7 @@ import { ElasticService } from 'src/helpers/elastic.service'; TokenModule, EnergyModule, LockedAssetModule, + ElasticSearchModule, ], providers: [ GovernanceTokenSnapshotService, @@ -57,7 +58,6 @@ import { ElasticService } from 'src/helpers/elastic.service'; GovernanceEnergyContractResolver, GovernanceTokenSnapshotContractResolver, GovernanceProposalResolver, - ElasticService, ], exports: [ GovernanceTokenSnapshotAbiService, diff --git a/src/modules/governance/services/governance.compute.service.ts b/src/modules/governance/services/governance.compute.service.ts index 64626f704..7334a53ad 100644 --- a/src/modules/governance/services/governance.compute.service.ts +++ b/src/modules/governance/services/governance.compute.service.ts @@ -6,37 +6,57 @@ import { CacheTtlInfo } from '../../../services/caching/cache.ttl.info'; import { GovernanceSetterService } from './governance.setter.service'; import { Address } from '@multiversx/sdk-core/out'; import { decimalToHex } from '../../../utils/token.converters'; -import { ElasticQuery, ElasticSortOrder, QueryType } from '@multiversx/sdk-nestjs-elastic'; -import { ElasticService } from 'src/helpers/elastic.service'; import { toVoteType } from '../../../utils/governance'; +import { ElasticSearchEventsService } from 'src/services/elastic-search/services/es.events.service'; +import { RawElasticEventType } from 'src/services/elastic-search/entities/raw.elastic.event'; @Injectable() export class GovernanceComputeService { constructor( - private readonly elasticService: ElasticService, + private readonly elasticEventsService: ElasticSearchEventsService, private readonly governanceSetter: GovernanceSetterService, - ) { - } + ) {} - async userVotedProposalsWithVoteType(scAddress: string, userAddress: string, proposalId: number): Promise { - const currentCachedProposalVoteTypes = await this.userVoteTypesForContract(scAddress, userAddress); - const cachedVoteType = currentCachedProposalVoteTypes.find((proposal) => proposal.proposalId === proposalId); + async userVotedProposalsWithVoteType( + scAddress: string, + userAddress: string, + proposalId: number, + ): Promise { + const currentCachedProposalVoteTypes = + await this.userVoteTypesForContract(scAddress, userAddress); + const cachedVoteType = currentCachedProposalVoteTypes.find( + (proposal) => proposal.proposalId === proposalId, + ); if (cachedVoteType) { return cachedVoteType.vote; } - const log = await this.getVoteLog('vote', scAddress, userAddress, proposalId); + const voteEvents = await this.getVoteEvents( + scAddress, + userAddress, + proposalId, + ); + let voteType = VoteType.NotVoted; - if (log.length > 0) { - const voteEvent = log[0]._source.events.find((event) => event.identifier === 'vote'); - voteType = toVoteType(atob(voteEvent.topics[0])); + if (voteEvents.length > 0) { + const voteEvent = voteEvents.find( + (event) => event.identifier === 'vote', + ); + voteType = toVoteType( + Buffer.from(voteEvent.topics[0], 'hex').toString(), + ); } + const proposalVoteType = { proposalId, vote: voteType, - } + }; currentCachedProposalVoteTypes.push(proposalVoteType); - await this.governanceSetter.userVoteTypesForContract(scAddress, userAddress, currentCachedProposalVoteTypes); + await this.governanceSetter.userVoteTypesForContract( + scAddress, + userAddress, + currentCachedProposalVoteTypes, + ); return proposalVoteType.vote; } @@ -46,43 +66,22 @@ export class GovernanceComputeService { remoteTtl: CacheTtlInfo.ContractState.remoteTtl, localTtl: CacheTtlInfo.ContractState.localTtl, }) - async userVoteTypesForContract(scAddress: string, userAddress: string): Promise<{ proposalId: number, vote: VoteType }[]> { + async userVoteTypesForContract( + scAddress: string, + userAddress: string, + ): Promise<{ proposalId: number; vote: VoteType }[]> { return []; } - private async getVoteLog( - eventName: string, + private async getVoteEvents( scAddress: string, callerAddress: string, proposalId: number, - ): Promise { - const elasticQueryAdapter: ElasticQuery = new ElasticQuery(); - const encodedProposalId = Buffer.from(decimalToHex(proposalId), 'hex').toString('base64'); - const encodedCallerAddress = Buffer.from(Address.fromString(callerAddress).hex(), 'hex').toString('base64'); - elasticQueryAdapter.condition.must = [ - QueryType.Match('address', scAddress), - QueryType.Nested('events', [ - QueryType.Match('events.address', scAddress), - QueryType.Match('events.identifier', eventName), - ]), - QueryType.Nested('events', [ - QueryType.Match('events.topics', encodedProposalId), - ]), - QueryType.Nested('events', [ - QueryType.Match('events.topics', encodedCallerAddress), - ]), - ]; - - elasticQueryAdapter.sort = [ - { name: 'timestamp', order: ElasticSortOrder.ascending }, - ]; - - - const list = await this.elasticService.getList( - 'logs', - '', - elasticQueryAdapter, + ): Promise { + return await this.elasticEventsService.getGovernanceVotes( + scAddress, + Address.fromString(callerAddress).hex(), + decimalToHex(proposalId), ); - return list; } } diff --git a/src/modules/pair/services/pair.compute.service.ts b/src/modules/pair/services/pair.compute.service.ts index 5fcbf1b2e..9a1287617 100644 --- a/src/modules/pair/services/pair.compute.service.ts +++ b/src/modules/pair/services/pair.compute.service.ts @@ -24,8 +24,8 @@ import { FarmComputeServiceV2 } from 'src/modules/farm/v2/services/farm.v2.compu import { StakingComputeService } from 'src/modules/staking/services/staking.compute.service'; import { CacheService } from '@multiversx/sdk-nestjs-cache'; import { getAllKeys } from 'src/utils/get.many.utils'; -import { ESTransactionsService } from 'src/services/elastic-search/services/es.transactions.service'; import moment from 'moment'; +import { ElasticSearchEventsService } from 'src/services/elastic-search/services/es.events.service'; @Injectable() export class PairComputeService implements IPairComputeService { @@ -47,7 +47,7 @@ export class PairComputeService implements IPairComputeService { private readonly farmCompute: FarmComputeServiceV2, private readonly stakingCompute: StakingComputeService, private readonly cachingService: CacheService, - private readonly elasticTransactionsService: ESTransactionsService, + private readonly elasticEventsService: ElasticSearchEventsService, ) {} async getTokenPrice(pairAddress: string, tokenID: string): Promise { @@ -690,9 +690,7 @@ export class PairComputeService implements IPairComputeService { } async computeTradesCount(pairAddress: string): Promise { - return await this.elasticTransactionsService.computePairSwapCount( - pairAddress, - ); + return await this.elasticEventsService.getPairSwapCount(pairAddress); } @ErrorLoggerAsync({ @@ -711,7 +709,7 @@ export class PairComputeService implements IPairComputeService { const end = moment.utc().unix(); const start = moment.unix(end).subtract(1, 'day').unix(); - return await this.elasticTransactionsService.computePairSwapCount( + return await this.elasticEventsService.getPairSwapCount( pairAddress, start, end, diff --git a/src/modules/pair/specs/pair.compute.service.spec.ts b/src/modules/pair/specs/pair.compute.service.spec.ts index 1f2ebe65e..7d7ec100c 100644 --- a/src/modules/pair/specs/pair.compute.service.spec.ts +++ b/src/modules/pair/specs/pair.compute.service.spec.ts @@ -23,7 +23,6 @@ import { EsdtToken } from 'src/modules/tokens/models/esdtToken.model'; import { AssetsModel } from 'src/modules/tokens/models/assets.model'; import { RolesModel } from 'src/modules/tokens/models/roles.model'; import { PairAbiService } from '../services/pair.abi.service'; -import { ElasticService } from 'src/helpers/elastic.service'; import { RemoteConfigGetterServiceProvider } from 'src/modules/remote-config/mocks/remote-config.getter.mock'; import { StakingProxyAbiServiceProvider } from 'src/modules/staking-proxy/mocks/staking.proxy.abi.service.mock'; import { FarmAbiServiceProviderV2 } from 'src/modules/farm/mocks/farm.v2.abi.service.mock'; @@ -66,7 +65,6 @@ describe('PairService', () => { ContextGetterServiceProvider, ApiConfigService, MXApiServiceProvider, - ElasticService, FarmAbiServiceProviderV2, RemoteConfigGetterServiceProvider, StakingProxyAbiServiceProvider, diff --git a/src/modules/router/router.module.ts b/src/modules/router/router.module.ts index 9a2b43ab3..953d8dc5e 100644 --- a/src/modules/router/router.module.ts +++ b/src/modules/router/router.module.ts @@ -11,7 +11,6 @@ import { CommonAppModule } from 'src/common.app.module'; import { ContextModule } from 'src/services/context/context.module'; import { WrappingModule } from '../wrapping/wrap.module'; import { RemoteConfigModule } from '../remote-config/remote-config.module'; -import { ElasticService } from 'src/helpers/elastic.service'; import { SwapEnableConfigResolver } from './swap.enable.config.resolver'; import { SimpleLockModule } from '../simple-lock/simple.lock.module'; import { ESTransactionsService } from 'src/services/elastic-search/services/es.transactions.service'; @@ -35,7 +34,6 @@ import { AnalyticsModule } from '../analytics/analytics.module'; RouterComputeService, RouterTransactionService, ESTransactionsService, - ElasticService, SwapEnableConfigResolver, RouterResolver, ], diff --git a/src/modules/tokens/services/token.compute.service.ts b/src/modules/tokens/services/token.compute.service.ts index 8a227db18..bd8969a6a 100644 --- a/src/modules/tokens/services/token.compute.service.ts +++ b/src/modules/tokens/services/token.compute.service.ts @@ -23,12 +23,12 @@ import { QueryType, } from '@multiversx/sdk-nestjs-elastic'; import moment from 'moment'; -import { ESLogsService } from 'src/services/elastic-search/services/es.logs.service'; import { PendingExecutor } from 'src/utils/pending.executor'; import { CacheService } from '@multiversx/sdk-nestjs-cache'; import { TokenService } from './token.service'; import { computeValueUSD } from 'src/utils/token.converters'; import { getAllKeys } from 'src/utils/get.many.utils'; +import { ElasticSearchEventsService } from 'src/services/elastic-search/services/es.events.service'; @Injectable() export class TokenComputeService implements ITokenComputeService { @@ -53,8 +53,8 @@ export class TokenComputeService implements ITokenComputeService { private readonly dataApi: MXDataApiService, private readonly analyticsQuery: AnalyticsQueryService, private readonly elasticService: ElasticService, - private readonly logsElasticService: ESLogsService, private readonly cachingService: CacheService, + private readonly elasticEventsService: ElasticSearchEventsService, ) { this.swapCountExecutor = new PendingExecutor( async () => await this.allTokensSwapsCount(), @@ -704,11 +704,12 @@ export class TokenComputeService implements ITokenComputeService { ): Promise<{ tokenID: string; swapsCount: number }[]> { const pairAddresses = await this.routerAbi.pairsAddress(); - const allSwapsCount = await this.logsElasticService.getTokenSwapsCount( - start, - end, - pairAddresses, - ); + const allSwapsCount = + await this.elasticEventsService.getTokenSwapsCount( + start, + end, + pairAddresses, + ); const result = []; diff --git a/src/modules/tokens/token.module.ts b/src/modules/tokens/token.module.ts index c8b6bd73f..879a2282c 100644 --- a/src/modules/tokens/token.module.ts +++ b/src/modules/tokens/token.module.ts @@ -13,10 +13,8 @@ import { MXCommunicationModule } from 'src/services/multiversx-communication/mx. import { NftCollectionResolver } from './nftCollection.resolver'; import { NftTokenResolver } from './nftToken.resolver'; import { AnalyticsModule } from 'src/services/analytics/analytics.module'; -import { ElasticService } from 'src/helpers/elastic.service'; import { TokenFilteringService } from './services/token.filtering.service'; import { ElasticSearchModule } from 'src/services/elastic-search/elastic.search.module'; -import { ESLogsService } from 'src/services/elastic-search/services/es.logs.service'; import { TokenLoader } from './services/token.loader'; @Module({ @@ -41,9 +39,7 @@ import { TokenLoader } from './services/token.loader'; TokensResolver, NftCollectionResolver, NftTokenResolver, - ElasticService, TokenFilteringService, - ESLogsService, ], exports: [ TokenRepositoryService, diff --git a/src/private.app.module.ts b/src/private.app.module.ts index d41588c36..39eff5a97 100644 --- a/src/private.app.module.ts +++ b/src/private.app.module.ts @@ -1,7 +1,6 @@ import { Module } from '@nestjs/common'; import { CommonAppModule } from './common.app.module'; import { MetricsController } from './endpoints/metrics/metrics.controller'; -import { ElasticService } from './helpers/elastic.service'; import { PairModule } from './modules/pair/pair.module'; import { RemoteConfigController } from './modules/remote-config/remote-config.controller'; import { RemoteConfigModule } from './modules/remote-config/remote-config.module'; @@ -19,6 +18,6 @@ import { ESTransactionsService } from './services/elastic-search/services/es.tra DynamicModuleUtils.getCacheModule(), ], controllers: [MetricsController, TokenController, RemoteConfigController], - providers: [ElasticService, ESTransactionsService], + providers: [ESTransactionsService], }) export class PrivateAppModule {} diff --git a/src/services/cache.warmer.module.ts b/src/services/cache.warmer.module.ts index 33f6b870d..eb5e85fde 100644 --- a/src/services/cache.warmer.module.ts +++ b/src/services/cache.warmer.module.ts @@ -14,7 +14,6 @@ import { CommonAppModule } from 'src/common.app.module'; import { AnalyticsCacheWarmerService } from './crons/analytics.cache.warmer.service'; import { AnalyticsModule } from 'src/modules/analytics/analytics.module'; import { TransactionProcessorService } from './crons/transaction.processor.service'; -import { LogsProcessorService } from './crons/logs.processor.service'; import { StakingModule } from 'src/modules/staking/staking.module'; import { StakingCacheWarmerService } from './crons/staking.cache.warmer.service'; import { StakingProxyCacheWarmerService } from './crons/staking.proxy.cache.warmer.service'; @@ -31,7 +30,6 @@ import { FarmModuleV1_2 } from 'src/modules/farm/v1.2/farm.v1.2.module'; import { FarmModuleV1_3 } from 'src/modules/farm/v1.3/farm.v1.3.module'; import { FarmModule } from 'src/modules/farm/farm.module'; import { AnalyticsModule as AnalyticsServicesModule } from 'src/services/analytics/analytics.module'; -import { ElasticService } from 'src/helpers/elastic.service'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; import { GovernanceCacheWarmerService } from './crons/governance.cache.warmer.service'; import { GovernanceModule } from '../modules/governance/governance.module'; @@ -43,6 +41,8 @@ import { FeesCollectorCacheWarmerService } from './crons/fees.collector.cache.wa import { FeesCollectorModule } from 'src/modules/fees-collector/fees-collector.module'; import { WeekTimekeepingModule } from 'src/submodules/week-timekeeping/week-timekeeping.module'; import { WeeklyRewardsSplittingModule } from 'src/submodules/weekly-rewards-splitting/weekly-rewards-splitting.module'; +import { ElasticSearchModule } from './elastic-search/elastic.search.module'; +import { EventsProcessorService } from './crons/events.processor.service'; @Module({ imports: [ @@ -73,6 +73,7 @@ import { WeeklyRewardsSplittingModule } from 'src/submodules/weekly-rewards-spli FeesCollectorModule, WeekTimekeepingModule, WeeklyRewardsSplittingModule, + ElasticSearchModule, ], controllers: [], providers: [ @@ -88,8 +89,7 @@ import { WeeklyRewardsSplittingModule } from 'src/submodules/weekly-rewards-spli PriceDiscoveryCacheWarmerService, GovernanceCacheWarmerService, TransactionProcessorService, - LogsProcessorService, - ElasticService, + EventsProcessorService, TokensCacheWarmerService, EscrowCacheWarmerService, FeesCollectorCacheWarmerService, diff --git a/src/services/crons/logs.processor.service.ts b/src/services/crons/events.processor.service.ts similarity index 55% rename from src/services/crons/logs.processor.service.ts rename to src/services/crons/events.processor.service.ts index fa2fb8559..7a5bf5ca7 100644 --- a/src/services/crons/logs.processor.service.ts +++ b/src/services/crons/events.processor.service.ts @@ -4,7 +4,6 @@ import { CacheService } from '@multiversx/sdk-nestjs-cache'; import { MXApiService } from '../multiversx-communication/mx.api.service'; import { cacheConfig, constantsConfig } from 'src/config'; import BigNumber from 'bignumber.js'; -import { Constants } from '@multiversx/sdk-nestjs-common'; import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { Logger } from 'winston'; import { generateLogMessage } from 'src/utils/generate-log-message'; @@ -13,21 +12,22 @@ import { EsdtLocalBurnEvent, ExitFarmEventV1_2, ExitFarmEventV1_3, + ExitFarmEventV2, + TRANSACTION_EVENTS, } from '@multiversx/sdk-exchange'; import { farmVersion } from 'src/utils/farm.utils'; import { FarmVersion } from 'src/modules/farm/models/farm.model'; import { AnalyticsWriteService } from '../analytics/services/analytics.write.service'; import { ApiConfigService } from 'src/helpers/api.config.service'; import { IngestRecord } from '../analytics/entities/ingest.record'; -import { - ElasticQuery, - ElasticSortOrder, - QueryType, -} from '@multiversx/sdk-nestjs-elastic'; -import { ElasticService } from 'src/helpers/elastic.service'; +import { SWAP_IDENTIFIER } from 'src/modules/rabbitmq/handlers/pair.swap.handler.service'; +import { ElasticSearchEventsService } from '../elastic-search/services/es.events.service'; +import { RawElasticEventType } from '../elastic-search/entities/raw.elastic.event'; +import { convertEventTopicsAndDataToBase64 } from 'src/utils/elastic.search.utils'; +import { Constants } from '@multiversx/sdk-nestjs-common'; @Injectable() -export class LogsProcessorService { +export class EventsProcessorService { isProcessing = false; feeMap: Map = new Map(); penaltyMap: Map = new Map(); @@ -35,14 +35,14 @@ export class LogsProcessorService { constructor( private readonly cachingService: CacheService, private readonly apiService: MXApiService, - private readonly elasticService: ElasticService, + private readonly elasticEventsService: ElasticSearchEventsService, private readonly analyticsWrite: AnalyticsWriteService, private readonly apiConfig: ApiConfigService, @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} @Cron(CronExpression.EVERY_MINUTE) - async handleNewLogs() { + async handleNewEvents() { if (this.isProcessing || process.env.NODE_ENV === 'shadowfork2') { return; } @@ -57,7 +57,7 @@ export class LogsProcessorService { } await this.getFeeBurned(currentTimestamp, lastProcessedTimestamp); - await this.getExitFarmLogs( + await this.getPenaltyBurned( currentTimestamp, lastProcessedTimestamp, ); @@ -69,8 +69,8 @@ export class LogsProcessorService { ); } catch (error) { const logMessage = generateLogMessage( - LogsProcessorService.name, - this.handleNewLogs.name, + EventsProcessorService.name, + this.handleNewEvents.name, '', error, ); @@ -112,15 +112,10 @@ export class LogsProcessorService { private async getFeeBurned( currentTimestamp: number, lastProcessedTimestamp: number, - ) { + ): Promise { this.feeMap.clear(); - await this.getSwapLogs( - 'swapTokensFixedInput', - currentTimestamp, - lastProcessedTimestamp, - ); - await this.getSwapLogs( - 'swapTokensFixedOutput', + + await this.getSwapAndBurnEvents( currentTimestamp, lastProcessedTimestamp, ); @@ -133,86 +128,168 @@ export class LogsProcessorService { this.logger.info(`fee burned records: ${totalWriteRecords}`); } - private async getSwapLogs( - swapType: string, + private async getPenaltyBurned( currentTimestamp: number, lastProcessedTimestamp: number, - ) { - const transactionsLogs = await this.getTransactionsLogs( - swapType, + ): Promise { + this.penaltyMap.clear(); + + await this.getExitFarmAndBurnEvents( currentTimestamp, lastProcessedTimestamp, ); - for (const transactionLogs of transactionsLogs) { - const timestamp: number = transactionLogs._source.timestamp; - const events = transactionLogs._source.events; - this.processSwapEvents(events, timestamp); - } + const totalWriteRecords = await this.writeRecords( + this.penaltyMap, + 'penaltyBurned', + ); + + this.logger.info(`penalty burned records ${totalWriteRecords}`); } - private async getExitFarmLogs( + private async getSwapAndBurnEvents( currentTimestamp: number, lastProcessedTimestamp: number, - ) { - const transactionsLogs = await this.getTransactionsLogs( - 'exitFarm', + ): Promise { + const transactionsEvents: Map = + new Map(); + const processEventsAction = async (events: any[]): Promise => { + for (const originalEvent of events) { + const event = convertEventTopicsAndDataToBase64(originalEvent); + + if (transactionsEvents.has(event.txHash)) { + const txEvents = transactionsEvents.get(event.txHash); + txEvents.push(event); + transactionsEvents.set(event.txHash, txEvents); + } else { + transactionsEvents.set(event.txHash, [event]); + } + } + }; + + await this.elasticEventsService.getEvents( + [ + SWAP_IDENTIFIER.SWAP_FIXED_INPUT, + SWAP_IDENTIFIER.SWAP_FIXED_OUTPUT, + TRANSACTION_EVENTS.ESDT_LOCAL_BURN, + ], + lastProcessedTimestamp, currentTimestamp, + processEventsAction, + 500, + ); + + this.processSwapTransactionsEvents(transactionsEvents); + } + + private async getExitFarmAndBurnEvents( + currentTimestamp: number, + lastProcessedTimestamp: number, + ): Promise { + const transactionsEvents: Map = + new Map(); + const processEventsAction = async (events: any[]): Promise => { + for (const originalEvent of events) { + const event = convertEventTopicsAndDataToBase64(originalEvent); + + if (transactionsEvents.has(event.txHash)) { + const txEvents = transactionsEvents.get(event.txHash); + txEvents.push(event); + transactionsEvents.set(event.txHash, txEvents); + } else { + transactionsEvents.set(event.txHash, [event]); + } + } + }; + + await this.elasticEventsService.getEvents( + ['exitFarm', TRANSACTION_EVENTS.ESDT_LOCAL_BURN], lastProcessedTimestamp, + currentTimestamp, + processEventsAction, + 500, ); - this.penaltyMap.clear(); + this.processExitFarmTransactionsEvents(transactionsEvents); + } + + private processSwapTransactionsEvents( + transactionEvents: Map, + ): void { + const txHashes = transactionEvents.keys(); + for (const txHash of txHashes) { + const events = transactionEvents.get(txHash); + + let hasSwapEvents = false; + let timestamp: number; + const burnEvents: EsdtLocalBurnEvent[] = []; + + events.forEach((event) => { + if (event.identifier === TRANSACTION_EVENTS.ESDT_LOCAL_BURN) { + burnEvents.push(new EsdtLocalBurnEvent(event)); + } else { + hasSwapEvents = true; + timestamp = event.timestamp; + } + }); - for (const transactionLogs of transactionsLogs) { - const timestamp = transactionLogs._source.timestamp; - const events = transactionLogs._source.events; + if (!hasSwapEvents || burnEvents.length === 0) { + continue; + } - this.processExitFarmEvents(events, timestamp); + this.processSwapLocalBurnEvents(burnEvents, timestamp); } + } - const totalWriteRecords = await this.writeRecords( - this.penaltyMap, - 'penaltyBurned', - ); + private processExitFarmTransactionsEvents( + transactionEvents: Map, + ): void { + const txHashes = transactionEvents.keys(); + for (const txHash of txHashes) { + const events = transactionEvents.get(txHash); + + const burnEvents: EsdtLocalBurnEvent[] = []; + let exitFarmEvent: BaseFarmEvent | ExitFarmEventV2 | undefined = + undefined; + let timestamp: number; + + events.forEach((event) => { + switch (event.identifier) { + case 'exitFarm': + if (event.data === '') { + break; + } + const version = farmVersion(event.address); + switch (version) { + case FarmVersion.V1_2: + exitFarmEvent = new ExitFarmEventV1_2(event); + break; + case FarmVersion.V1_3: + exitFarmEvent = new ExitFarmEventV1_3(event); + break; + case FarmVersion.V2: + if (event.topics.length !== 6) { + break; + } + exitFarmEvent = new ExitFarmEventV2(event); + break; + } + timestamp = event.timestamp; + break; + case TRANSACTION_EVENTS.ESDT_LOCAL_BURN: + burnEvents.push(new EsdtLocalBurnEvent(event)); + break; + default: + break; + } + }); - this.logger.info(`penalty burned records ${totalWriteRecords}`); - } + if (exitFarmEvent === undefined) { + continue; + } - private async getTransactionsLogs( - eventName: string, - currentTimestamp: number, - lastProcessedTimestamp: number, - ): Promise { - const elasticQueryAdapter: ElasticQuery = new ElasticQuery(); - elasticQueryAdapter.condition.must = [ - QueryType.Nested('events', [ - QueryType.Match('events.identifier', eventName), - ]), - ]; - - elasticQueryAdapter.filter = [ - QueryType.Range( - 'timestamp', - { - key: 'gte', - value: lastProcessedTimestamp, - }, - { - key: 'lte', - value: currentTimestamp, - }, - ), - ]; - - elasticQueryAdapter.sort = [ - { name: 'timestamp', order: ElasticSortOrder.ascending }, - ]; - - return await this.elasticService.getList( - 'logs', - '', - elasticQueryAdapter, - ); + this.processExitFarmEvents(exitFarmEvent, burnEvents, timestamp); + } } private async writeRecords( @@ -259,7 +336,7 @@ export class LogsProcessorService { return Records.length; } catch (error) { const logMessage = generateLogMessage( - LogsProcessorService.name, + EventsProcessorService.name, this.writeRecords.name, '', error, @@ -284,20 +361,11 @@ export class LogsProcessorService { return fee.toFixed(); } - private processSwapEvents(events: any[], timestamp: number): Promise { - const esdtLocalBurnEvents: EsdtLocalBurnEvent[] = []; - - for (const event of events) { - switch (event.identifier) { - case 'ESDTLocalBurn': - esdtLocalBurnEvents.push(new EsdtLocalBurnEvent(event)); - break; - default: - break; - } - } - - const feeBurned = this.getBurnedFee(esdtLocalBurnEvents); + private processSwapLocalBurnEvents( + events: EsdtLocalBurnEvent[], + timestamp: number, + ): void { + const feeBurned = this.getBurnedFee(events); if (feeBurned === '0') { return; @@ -316,49 +384,17 @@ export class LogsProcessorService { } private processExitFarmEvents( - events: any[], + exitFarmEvent: BaseFarmEvent | ExitFarmEventV2, + burnEvents: EsdtLocalBurnEvent[] = [], timestamp: number, ): Promise { - let exitFarmEvent: BaseFarmEvent | undefined = undefined; - const esdtLocalBurnEvents: EsdtLocalBurnEvent[] = []; + const penalty = this.getBurnedPenalty(exitFarmEvent, burnEvents); - for (const event of events) { - switch (event.identifier) { - case 'exitFarm': - if (event.data === '') { - break; - } - const version = farmVersion(event.address); - switch (version) { - case FarmVersion.V1_2: - exitFarmEvent = new ExitFarmEventV1_2(event); - break; - case FarmVersion.V1_3: - exitFarmEvent = new ExitFarmEventV1_3(event); - break; - } - break; - case 'ESDTLocalBurn': - esdtLocalBurnEvents.push(new EsdtLocalBurnEvent(event)); - break; - default: - break; - } - } - - if (exitFarmEvent === undefined) { + if (penalty === '0') { return; } const penaltyEntry = this.penaltyMap.get(timestamp); - const penalty = this.getBurnedPenalty( - exitFarmEvent, - esdtLocalBurnEvents, - ); - - if (penalty === '0') { - return; - } if (penaltyEntry) { this.penaltyMap.set( @@ -371,6 +407,41 @@ export class LogsProcessorService { } private getBurnedPenalty( + exitFarmEvent: BaseFarmEvent | ExitFarmEventV2, + esdtLocalBurnEvents: EsdtLocalBurnEvent[], + ): string { + if (!(exitFarmEvent instanceof ExitFarmEventV2)) { + return this.getBurnedPenaltyOldFarms( + exitFarmEvent, + esdtLocalBurnEvents, + ); + } + + let penalty = new BigNumber(0); + + for (const localBurn of esdtLocalBurnEvents) { + const burnedTokenID = localBurn.getTopics().tokenID; + const burnedAmount = localBurn.getTopics().amount; + + // Skip LP tokens burn + if (burnedTokenID !== constantsConfig.MEX_TOKEN_ID) { + continue; + } + + if ( + burnedAmount === exitFarmEvent.farmingToken.amount && + burnedTokenID === exitFarmEvent.farmingToken.tokenIdentifier + ) { + continue; + } + + penalty = penalty.plus(burnedAmount); + } + + return penalty.toFixed(); + } + + private getBurnedPenaltyOldFarms( exitFarmEvent: BaseFarmEvent, esdtLocalBurnEvents: EsdtLocalBurnEvent[], ): string { diff --git a/src/services/elastic-search/elastic.search.module.ts b/src/services/elastic-search/elastic.search.module.ts index 7378c1843..2a3759f9d 100644 --- a/src/services/elastic-search/elastic.search.module.ts +++ b/src/services/elastic-search/elastic.search.module.ts @@ -1,8 +1,8 @@ import { Module } from '@nestjs/common'; import { ESTransactionsService } from './services/es.transactions.service'; -import { ESLogsService } from './services/es.logs.service'; import { CommonAppModule } from 'src/common.app.module'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; +import { ElasticSearchEventsService } from './services/es.events.service'; @Module({ imports: [ @@ -10,7 +10,7 @@ import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; DynamicModuleUtils.getApiModule(), DynamicModuleUtils.getElasticModule(), ], - providers: [ESTransactionsService, ESLogsService], - exports: [ESTransactionsService, ESLogsService], + providers: [ESTransactionsService, ElasticSearchEventsService], + exports: [ESTransactionsService, ElasticSearchEventsService], }) export class ElasticSearchModule {} diff --git a/src/services/elastic-search/entities/raw.elastic.event.ts b/src/services/elastic-search/entities/raw.elastic.event.ts new file mode 100644 index 000000000..73ceec06f --- /dev/null +++ b/src/services/elastic-search/entities/raw.elastic.event.ts @@ -0,0 +1,10 @@ +import { RawEventType } from '@multiversx/sdk-exchange'; + +export type RawElasticEventType = RawEventType & { + logAddress: string; + shardID: number; + timestamp: number; + txOrder: number; + txHash: string; + order: number; +}; diff --git a/src/services/elastic-search/services/es.events.service.ts b/src/services/elastic-search/services/es.events.service.ts new file mode 100644 index 000000000..6642ec2cb --- /dev/null +++ b/src/services/elastic-search/services/es.events.service.ts @@ -0,0 +1,197 @@ +import { + ElasticPagination, + ElasticQuery, + ElasticService, + ElasticSortOrder, + QueryType, +} from '@multiversx/sdk-nestjs-elastic'; +import { Inject, Injectable } from '@nestjs/common'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { SWAP_IDENTIFIER } from 'src/modules/rabbitmq/handlers/pair.swap.handler.service'; +import { Logger } from 'winston'; +import { RawElasticEventType } from '../entities/raw.elastic.event'; +import { SwapEvent } from '@multiversx/sdk-exchange'; +import { convertEventTopicsAndDataToBase64 } from 'src/utils/elastic.search.utils'; + +@Injectable() +export class ElasticSearchEventsService { + constructor( + private readonly elasticService: ElasticService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, + ) {} + + async getPairSwapCount( + address: string, + start?: number, + end?: number, + ): Promise { + const elasticQueryAdapter: ElasticQuery = new ElasticQuery(); + + elasticQueryAdapter.condition.must = [ + QueryType.Match('address', address), + QueryType.Should([ + QueryType.Match('identifier', SWAP_IDENTIFIER.SWAP_FIXED_INPUT), + QueryType.Match( + 'identifier', + SWAP_IDENTIFIER.SWAP_FIXED_OUTPUT, + ), + ]), + ]; + + if (start && end) { + elasticQueryAdapter.filter = [ + QueryType.Range( + 'timestamp', + { + key: 'gte', + value: start, + }, + { + key: 'lte', + value: end, + }, + ), + ]; + } + + return await this.elasticService.getCount( + 'events', + elasticQueryAdapter, + ); + } + + async getTokenSwapsCount( + startTimestamp: number, + endTimestamp: number, + pairAddresses: string[], + ): Promise> { + const tokensSwapCountMap: Map = new Map(); + + const processSwapEventsAction = async ( + events: any[], + ): Promise => { + for (const event of events) { + if (!pairAddresses.includes(event.address)) { + continue; + } + const swapEvent = new SwapEvent( + convertEventTopicsAndDataToBase64(event), + ); + + const firstTokenID = swapEvent.getTokenIn().tokenID; + const secondTokenID = swapEvent.getTokenOut().tokenID; + + if (tokensSwapCountMap.has(firstTokenID)) { + tokensSwapCountMap.set( + firstTokenID, + tokensSwapCountMap.get(firstTokenID) + 1, + ); + } else { + tokensSwapCountMap.set(firstTokenID, 1); + } + + if (tokensSwapCountMap.has(secondTokenID)) { + tokensSwapCountMap.set( + secondTokenID, + tokensSwapCountMap.get(secondTokenID) + 1, + ); + } else { + tokensSwapCountMap.set(secondTokenID, 1); + } + } + }; + + await this.getEvents( + [ + SWAP_IDENTIFIER.SWAP_FIXED_INPUT, + SWAP_IDENTIFIER.SWAP_FIXED_OUTPUT, + ], + startTimestamp, + endTimestamp, + processSwapEventsAction, + 500, + ); + + return tokensSwapCountMap; + } + + async getGovernanceVotes( + scAddress: string, + callerAddressHex: string, + proposalIdHex: string, + ): Promise { + const result: RawElasticEventType[] = []; + const processEventsAction = async (events: any[]): Promise => { + result.push(...events); + }; + + const elasticQueryAdapter: ElasticQuery = new ElasticQuery(); + + elasticQueryAdapter.condition.must = [ + QueryType.Match('address', scAddress), + QueryType.Match('identifier', 'vote'), + QueryType.Match('topics', proposalIdHex), + QueryType.Match('topics', callerAddressHex), + ]; + + elasticQueryAdapter.sort = [ + { name: 'timestamp', order: ElasticSortOrder.ascending }, + ]; + + await this.elasticService.getScrollableList( + 'events', + '', + elasticQueryAdapter, + processEventsAction, + ); + + return result; + } + + async getEvents( + eventIdentifiers: string[], + startTimestamp: number, + endTimestamp: number, + action: (items: any[]) => Promise, + size = 100, + ): Promise { + const pagination = new ElasticPagination(); + pagination.size = size; + + const elasticQueryAdapter: ElasticQuery = + new ElasticQuery().withPagination(pagination); + + elasticQueryAdapter.condition.must = [ + QueryType.Should( + eventIdentifiers.map((identifier) => + QueryType.Match('identifier', identifier), + ), + ), + ]; + + elasticQueryAdapter.filter = [ + QueryType.Range( + 'timestamp', + { + key: 'gte', + value: startTimestamp, + }, + { + key: 'lte', + value: endTimestamp, + }, + ), + ]; + + elasticQueryAdapter.sort = [ + { name: 'timestamp', order: ElasticSortOrder.ascending }, + ]; + + await this.elasticService.getScrollableList( + 'events', + '', + elasticQueryAdapter, + action, + ); + } +} diff --git a/src/services/elastic-search/services/es.logs.service.ts b/src/services/elastic-search/services/es.logs.service.ts deleted file mode 100644 index b4a0b29bf..000000000 --- a/src/services/elastic-search/services/es.logs.service.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { SwapEvent } from '@multiversx/sdk-exchange'; -import { - ElasticQuery, - ElasticService, - ElasticSortOrder, - QueryType, -} from '@multiversx/sdk-nestjs-elastic'; -import { Inject, Injectable } from '@nestjs/common'; -import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; -import { Logger } from 'winston'; - -@Injectable() -export class ESLogsService { - constructor( - private readonly elasticService: ElasticService, - @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, - ) {} - - async getTokenSwapsCount( - startTimestamp: number, - endTimestamp: number, - pairAddresses: string[], - ): Promise> { - const swapEvents: SwapEvent[] = []; - const tokensSwapCountMap: Map = new Map(); - - const processLogsAction = async (items: any[]): Promise => { - for (const transactionLogs of items) { - swapEvents.push( - ...this.processSwapEvents( - transactionLogs.events, - pairAddresses, - ), - ); - } - }; - - await Promise.all([ - this.getTransactionsLogs( - 'swapTokensFixedInput', - startTimestamp, - endTimestamp, - processLogsAction, - ), - this.getTransactionsLogs( - 'swapTokensFixedOutput', - startTimestamp, - endTimestamp, - processLogsAction, - ), - ]); - - for (const swapEvent of swapEvents) { - const eventTopics = swapEvent.getTopics(); - - if (tokensSwapCountMap.has(eventTopics.firstTokenID)) { - const currentCount = tokensSwapCountMap.get( - eventTopics.firstTokenID, - ); - tokensSwapCountMap.set( - eventTopics.firstTokenID, - currentCount + 1, - ); - } else { - tokensSwapCountMap.set(eventTopics.firstTokenID, 1); - } - - if (tokensSwapCountMap.has(eventTopics.secondTokenID)) { - const currentCount = tokensSwapCountMap.get( - eventTopics.secondTokenID, - ); - tokensSwapCountMap.set( - eventTopics.secondTokenID, - currentCount + 1, - ); - } else { - tokensSwapCountMap.set(eventTopics.secondTokenID, 1); - } - } - - return tokensSwapCountMap; - } - - private async getTransactionsLogs( - eventName: string, - startTimestamp: number, - endTimestamp: number, - action: (items: any[]) => Promise, - ): Promise { - const elasticQueryAdapter: ElasticQuery = new ElasticQuery(); - elasticQueryAdapter.condition.must = [ - QueryType.Nested('events', [ - QueryType.Match('events.identifier', eventName), - ]), - ]; - - elasticQueryAdapter.filter = [ - QueryType.Range( - 'timestamp', - { - key: 'gte', - value: startTimestamp, - }, - { - key: 'lte', - value: endTimestamp, - }, - ), - ]; - - elasticQueryAdapter.sort = [ - { name: 'timestamp', order: ElasticSortOrder.ascending }, - ]; - - await this.elasticService.getScrollableList( - 'logs', - '', - elasticQueryAdapter, - action, - ); - } - - private processSwapEvents( - events: any[], - pairAddresses: string[], - ): SwapEvent[] { - const esdtSwapEvents: SwapEvent[] = []; - - for (const event of events) { - if (!pairAddresses.includes(event.address)) { - continue; - } - if ( - event.identifier === 'swapTokensFixedInput' || - event.identifier === 'swapTokensFixedOutput' - ) { - esdtSwapEvents.push(new SwapEvent(event)); - } - } - - return esdtSwapEvents; - } -} diff --git a/src/utils/elastic.search.utils.ts b/src/utils/elastic.search.utils.ts new file mode 100644 index 000000000..b8e8186e6 --- /dev/null +++ b/src/utils/elastic.search.utils.ts @@ -0,0 +1,22 @@ +import { RawElasticEventType } from 'src/services/elastic-search/entities/raw.elastic.event'; + +export const convertEventTopicsAndDataToBase64 = ( + originalEvent: RawElasticEventType, +): RawElasticEventType => { + if (originalEvent.topics && Array.isArray(originalEvent.topics)) { + const convertedTopics: string[] = []; + for (const topic of originalEvent.topics) { + const base64Topic = Buffer.from(topic, 'hex').toString('base64'); + convertedTopics.push(base64Topic); + } + originalEvent.topics = convertedTopics; + } + + if (originalEvent.data && originalEvent.data.length > 0) { + originalEvent.data = Buffer.from(originalEvent.data, 'hex').toString( + 'base64', + ); + } + + return originalEvent; +};