From 06948550374cf240198cbb4adefc1f02bc8077f6 Mon Sep 17 00:00:00 2001 From: hschiau Date: Wed, 23 Oct 2024 19:45:10 +0300 Subject: [PATCH 1/3] SERVICES-2668: add indexer session mongodb schema and repository --- .../analytics.indexer.module.ts | 12 +++++ .../schemas/indexer.session.schema.ts | 52 +++++++++++++++++++ .../indexer.session.repository.ts | 18 +++++++ 3 files changed, 82 insertions(+) create mode 100644 src/modules/analytics-indexer/schemas/indexer.session.schema.ts create mode 100644 src/services/database/repositories/indexer.session.repository.ts diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts index 10bf1d0fc..4b2463c00 100644 --- a/src/modules/analytics-indexer/analytics.indexer.module.ts +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -13,6 +13,13 @@ import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service'; import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service'; import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service'; +import { DatabaseModule } from 'src/services/database/database.module'; +import { MongooseModule } from '@nestjs/mongoose'; +import { + IndexerSession, + IndexerSessionSchema, +} from './schemas/indexer.session.schema'; +import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; @Module({ imports: [ @@ -21,6 +28,10 @@ import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/i TokenModule, PriceDiscoveryModule, ElasticSearchModule, + DatabaseModule, + MongooseModule.forFeature([ + { name: IndexerSession.name, schema: IndexerSessionSchema }, + ]), ], providers: [ IndexerService, @@ -32,6 +43,7 @@ import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/i IndexerSwapHandlerService, IndexerLiquidityHandlerService, IndexerPriceDiscoveryHandlerService, + IndexerSessionRepositoryService, ], exports: [IndexerService], }) diff --git a/src/modules/analytics-indexer/schemas/indexer.session.schema.ts b/src/modules/analytics-indexer/schemas/indexer.session.schema.ts new file mode 100644 index 000000000..6d03a72e4 --- /dev/null +++ b/src/modules/analytics-indexer/schemas/indexer.session.schema.ts @@ -0,0 +1,52 @@ +import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose'; +import { Document } from 'mongoose'; +import { IndexerEventTypes } from '../entities/indexer.event.types'; + +export enum IndexerStatus { + PENDING = 'PENDING', + IN_PROGRESS = 'IN_PROGRESS', + COMPLETED = 'COMPLETED', + FAILED = 'FAILED', + ABORTED = 'ABORTED', +} + +export class IndexerJob { + startTimestamp: number; + endTimestamp: number; + order: number; + status: IndexerStatus; + runAttempts = 0; + errorCount = 0; + durationMs = 0; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} + +export type IndexerSessionDocument = IndexerSession & Document; + +@Schema({ + collection: 'indexer_sessions', +}) +export class IndexerSession { + @Prop({ required: true, unique: true }) + name: string; + @Prop({ required: true }) + startTimestamp: number; + @Prop({ required: true }) + endTimestamp: number; + @Prop(() => [IndexerEventTypes]) + eventTypes: IndexerEventTypes[]; + @Prop(() => [IndexerJob]) + jobs: IndexerJob[]; + @Prop(() => IndexerStatus) + status: IndexerStatus; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} + +export const IndexerSessionSchema = + SchemaFactory.createForClass(IndexerSession); diff --git a/src/services/database/repositories/indexer.session.repository.ts b/src/services/database/repositories/indexer.session.repository.ts new file mode 100644 index 000000000..a209adb9a --- /dev/null +++ b/src/services/database/repositories/indexer.session.repository.ts @@ -0,0 +1,18 @@ +import { Injectable } from '@nestjs/common'; +import { InjectModel } from '@nestjs/mongoose'; +import { Model } from 'mongoose'; +import { EntityRepository } from './entity.repository'; +import { + IndexerSession, + IndexerSessionDocument, +} from 'src/modules/analytics-indexer/schemas/indexer.session.schema'; + +@Injectable() +export class IndexerSessionRepositoryService extends EntityRepository { + constructor( + @InjectModel(IndexerSession.name) + private readonly indexerSessionModel: Model, + ) { + super(indexerSessionModel); + } +} From 013e7fa2509eceee1dc08fc7fd72628f74b32e6f Mon Sep 17 00:00:00 2001 From: hschiau Date: Wed, 23 Oct 2024 19:56:03 +0300 Subject: [PATCH 2/3] SERVICES-2668: add indexer controller for managing sessions - expose controller endpoints in private app --- .../analytics.indexer.controller.ts | 85 ++++++++++++++++++ .../analytics.indexer.module.ts | 2 + .../entities/create.session.dto.ts | 25 ++++++ .../services/indexer.persistence.service.ts | 89 +++++++++++++++++++ src/private.app.module.ts | 2 + 5 files changed, 203 insertions(+) create mode 100644 src/modules/analytics-indexer/analytics.indexer.controller.ts create mode 100644 src/modules/analytics-indexer/entities/create.session.dto.ts create mode 100644 src/modules/analytics-indexer/services/indexer.persistence.service.ts diff --git a/src/modules/analytics-indexer/analytics.indexer.controller.ts b/src/modules/analytics-indexer/analytics.indexer.controller.ts new file mode 100644 index 000000000..32a0d81e2 --- /dev/null +++ b/src/modules/analytics-indexer/analytics.indexer.controller.ts @@ -0,0 +1,85 @@ +import { + Body, + Controller, + Get, + HttpException, + HttpStatus, + Param, + Post, + UseGuards, + ValidationPipe, +} from '@nestjs/common'; +import mongoose from 'mongoose'; +import { CacheService } from '@multiversx/sdk-nestjs-cache'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; +import { IndexerSession } from './schemas/indexer.session.schema'; +import { CreateSessionDto } from './entities/create.session.dto'; +import { IndexerPersistenceService } from './services/indexer.persistence.service'; +import { JwtOrNativeAdminGuard } from '../auth/jwt.or.native.admin.guard'; + +@Controller('analytics-indexer') +export class AnalyticsIndexerController { + constructor( + private readonly indexerSessionRepository: IndexerSessionRepositoryService, + private readonly indexerPersistenceService: IndexerPersistenceService, + private readonly cachingService: CacheService, + ) {} + + @UseGuards(JwtOrNativeAdminGuard) + @Get('/sessions') + async getSessions(): Promise { + return await this.indexerSessionRepository.find({}); + } + + @UseGuards(JwtOrNativeAdminGuard) + @Post('/sessions') + async addSession( + @Body( + new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }), + ) + createSessionDto: CreateSessionDto, + ): Promise { + try { + return await this.indexerPersistenceService.createIndexerSession( + createSessionDto, + ); + } catch (error) { + throw new HttpException(error.message, HttpStatus.BAD_REQUEST); + } + } + + @UseGuards(JwtOrNativeAdminGuard) + @Get('/sessions/:nameOrID') + async getSession( + @Param('nameOrID') nameOrID: string, + ): Promise { + return await this.indexerSessionRepository.findOne( + mongoose.Types.ObjectId.isValid(nameOrID) + ? { _id: nameOrID } + : { name: nameOrID }, + ); + } + + @UseGuards(JwtOrNativeAdminGuard) + @Post('/sessions/:nameOrID/abort') + async abortSession(@Param('nameOrID') nameOrID: string): Promise { + const session = await this.indexerSessionRepository.findOne( + mongoose.Types.ObjectId.isValid(nameOrID) + ? { _id: nameOrID } + : { name: nameOrID }, + ); + + if (!session) { + throw new HttpException('Session not found', HttpStatus.NOT_FOUND); + } + + this.cachingService.set( + `indexer.abortSession.${session.name}`, + true, + Constants.oneHour(), + ); + + return true; + } +} diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts index 4b2463c00..decf9eeb6 100644 --- a/src/modules/analytics-indexer/analytics.indexer.module.ts +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -13,6 +13,7 @@ import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service'; import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service'; import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service'; +import { AnalyticsIndexerController } from './analytics.indexer.controller'; import { DatabaseModule } from 'src/services/database/database.module'; import { MongooseModule } from '@nestjs/mongoose'; import { @@ -46,5 +47,6 @@ import { IndexerSessionRepositoryService } from 'src/services/database/repositor IndexerSessionRepositoryService, ], exports: [IndexerService], + controllers: [AnalyticsIndexerController], }) export class AnalyticsIndexerModule {} diff --git a/src/modules/analytics-indexer/entities/create.session.dto.ts b/src/modules/analytics-indexer/entities/create.session.dto.ts new file mode 100644 index 000000000..3b18e7813 --- /dev/null +++ b/src/modules/analytics-indexer/entities/create.session.dto.ts @@ -0,0 +1,25 @@ +import { + ArrayMinSize, + ArrayUnique, + IsArray, + IsEnum, + IsInt, + IsOptional, + IsPositive, +} from 'class-validator'; +import { IndexerEventTypes } from './indexer.event.types'; + +export class CreateSessionDto { + @IsInt() + @IsPositive() + start: number; + @IsOptional() + @IsInt() + @IsPositive() + end?: number; + @IsArray() + @ArrayMinSize(1) + @ArrayUnique() + @IsEnum(IndexerEventTypes, { each: true }) + eventTypes: IndexerEventTypes[]; +} diff --git a/src/modules/analytics-indexer/services/indexer.persistence.service.ts b/src/modules/analytics-indexer/services/indexer.persistence.service.ts new file mode 100644 index 000000000..044362eb4 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.persistence.service.ts @@ -0,0 +1,89 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; +import { + IndexerJob, + IndexerSession, + IndexerStatus, +} from '../schemas/indexer.session.schema'; +import moment from 'moment'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; +import { CreateSessionDto } from '../entities/create.session.dto'; + +@Injectable() +export class IndexerPersistenceService { + constructor( + private readonly indexerSessionRepository: IndexerSessionRepositoryService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, + ) {} + + public async createIndexerSession( + createSessionDto: CreateSessionDto, + ): Promise { + const { start } = createSessionDto; + + const end = !createSessionDto.end + ? moment().unix() + : createSessionDto.end; + + if (start >= end) { + throw new Error('End timestamp should be after start'); + } + + const activeSession = await this.getActiveSession(); + + if (activeSession) { + throw new Error( + 'Cannot start a session while another one is in progress.', + ); + } + + return await this.indexerSessionRepository.create({ + name: `Session_${moment().unix()}`, + startTimestamp: start, + endTimestamp: end, + eventTypes: createSessionDto.eventTypes, + jobs: this.createSessionJobs(start, end), + status: IndexerStatus.PENDING, + }); + } + + public async getActiveSession(): Promise { + return await this.indexerSessionRepository.findOne({ + status: { + $in: [IndexerStatus.IN_PROGRESS, IndexerStatus.PENDING], + }, + }); + } + + private createSessionJobs(start: number, end: number): IndexerJob[] { + const jobs: IndexerJob[] = []; + const oneWeek = Constants.oneWeek(); + + let currentStart = start; + let order = 0; + + while (currentStart <= end) { + let currentEnd = Math.min(currentStart + oneWeek, end); + + // avoid edge case where remaining interval is a single second + if (currentEnd + 1 === end) { + currentEnd -= 1; + } + + jobs.push( + new IndexerJob({ + startTimestamp: currentStart, + endTimestamp: currentEnd, + order: order, + status: IndexerStatus.PENDING, + }), + ); + currentStart = currentEnd + 1; + order += 1; + } + + return jobs; + } +} diff --git a/src/private.app.module.ts b/src/private.app.module.ts index 39eff5a97..fae2896b8 100644 --- a/src/private.app.module.ts +++ b/src/private.app.module.ts @@ -8,6 +8,7 @@ import { TokenController } from './modules/tokens/token.controller'; import { TokenModule } from './modules/tokens/token.module'; import { DynamicModuleUtils } from './utils/dynamic.module.utils'; import { ESTransactionsService } from './services/elastic-search/services/es.transactions.service'; +import { AnalyticsIndexerModule } from './modules/analytics-indexer/analytics.indexer.module'; @Module({ imports: [ @@ -16,6 +17,7 @@ import { ESTransactionsService } from './services/elastic-search/services/es.tra TokenModule, RemoteConfigModule, DynamicModuleUtils.getCacheModule(), + AnalyticsIndexerModule, ], controllers: [MetricsController, TokenController, RemoteConfigController], providers: [ESTransactionsService], From f5182521b9fb68946e1dac594378522da67f3d15 Mon Sep 17 00:00:00 2001 From: hschiau Date: Wed, 23 Oct 2024 20:13:47 +0300 Subject: [PATCH 3/3] SERVICES-2668: move repository service and update imports --- .../analytics-indexer/analytics.indexer.controller.ts | 2 +- src/modules/analytics-indexer/analytics.indexer.module.ts | 6 +++++- .../services/indexer.persistence.service.ts | 2 +- .../services/indexer.session.repository.service.ts} | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) rename src/{services/database/repositories/indexer.session.repository.ts => modules/analytics-indexer/services/indexer.session.repository.service.ts} (78%) diff --git a/src/modules/analytics-indexer/analytics.indexer.controller.ts b/src/modules/analytics-indexer/analytics.indexer.controller.ts index 32a0d81e2..4cddda58c 100644 --- a/src/modules/analytics-indexer/analytics.indexer.controller.ts +++ b/src/modules/analytics-indexer/analytics.indexer.controller.ts @@ -12,7 +12,7 @@ import { import mongoose from 'mongoose'; import { CacheService } from '@multiversx/sdk-nestjs-cache'; import { Constants } from '@multiversx/sdk-nestjs-common'; -import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; +import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service'; import { IndexerSession } from './schemas/indexer.session.schema'; import { CreateSessionDto } from './entities/create.session.dto'; import { IndexerPersistenceService } from './services/indexer.persistence.service'; diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts index decf9eeb6..913fbd48b 100644 --- a/src/modules/analytics-indexer/analytics.indexer.module.ts +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -13,6 +13,8 @@ import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service'; import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service'; import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service'; +import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service'; +import { IndexerPersistenceService } from './services/indexer.persistence.service'; import { AnalyticsIndexerController } from './analytics.indexer.controller'; import { DatabaseModule } from 'src/services/database/database.module'; import { MongooseModule } from '@nestjs/mongoose'; @@ -20,10 +22,11 @@ import { IndexerSession, IndexerSessionSchema, } from './schemas/indexer.session.schema'; -import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; +import { MXCommunicationModule } from 'src/services/multiversx-communication/mx.communication.module'; @Module({ imports: [ + MXCommunicationModule, PairModule, RouterModule, TokenModule, @@ -45,6 +48,7 @@ import { IndexerSessionRepositoryService } from 'src/services/database/repositor IndexerLiquidityHandlerService, IndexerPriceDiscoveryHandlerService, IndexerSessionRepositoryService, + IndexerPersistenceService, ], exports: [IndexerService], controllers: [AnalyticsIndexerController], diff --git a/src/modules/analytics-indexer/services/indexer.persistence.service.ts b/src/modules/analytics-indexer/services/indexer.persistence.service.ts index 044362eb4..9accd5784 100644 --- a/src/modules/analytics-indexer/services/indexer.persistence.service.ts +++ b/src/modules/analytics-indexer/services/indexer.persistence.service.ts @@ -8,7 +8,7 @@ import { } from '../schemas/indexer.session.schema'; import moment from 'moment'; import { Constants } from '@multiversx/sdk-nestjs-common'; -import { IndexerSessionRepositoryService } from 'src/services/database/repositories/indexer.session.repository'; +import { IndexerSessionRepositoryService } from './indexer.session.repository.service'; import { CreateSessionDto } from '../entities/create.session.dto'; @Injectable() diff --git a/src/services/database/repositories/indexer.session.repository.ts b/src/modules/analytics-indexer/services/indexer.session.repository.service.ts similarity index 78% rename from src/services/database/repositories/indexer.session.repository.ts rename to src/modules/analytics-indexer/services/indexer.session.repository.service.ts index a209adb9a..9da7af4c6 100644 --- a/src/services/database/repositories/indexer.session.repository.ts +++ b/src/modules/analytics-indexer/services/indexer.session.repository.service.ts @@ -1,11 +1,11 @@ import { Injectable } from '@nestjs/common'; import { InjectModel } from '@nestjs/mongoose'; import { Model } from 'mongoose'; -import { EntityRepository } from './entity.repository'; +import { EntityRepository } from 'src/services/database/repositories/entity.repository'; import { IndexerSession, IndexerSessionDocument, -} from 'src/modules/analytics-indexer/schemas/indexer.session.schema'; +} from '../schemas/indexer.session.schema'; @Injectable() export class IndexerSessionRepositoryService extends EntityRepository {