Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SERVICES-2668] Rest endpoints for managing indexing sessions #1516

Open
wants to merge 3 commits into
base: SERVICES-2655-migrate-code-from-mx-exchange-analytics-indexer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/modules/analytics-indexer/analytics.indexer.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {
Body,
Controller,
Get,
HttpException,
HttpStatus,
Param,
Post,
UseGuards,
ValidationPipe,
} from '@nestjs/common';
import mongoose from 'mongoose';
import { CacheService } from '@multiversx/sdk-nestjs-cache';
import { Constants } from '@multiversx/sdk-nestjs-common';
import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service';
import { IndexerSession } from './schemas/indexer.session.schema';
import { CreateSessionDto } from './entities/create.session.dto';
import { IndexerPersistenceService } from './services/indexer.persistence.service';
import { JwtOrNativeAdminGuard } from '../auth/jwt.or.native.admin.guard';

@Controller('analytics-indexer')
export class AnalyticsIndexerController {
constructor(
private readonly indexerSessionRepository: IndexerSessionRepositoryService,
private readonly indexerPersistenceService: IndexerPersistenceService,
private readonly cachingService: CacheService,
) {}

@UseGuards(JwtOrNativeAdminGuard)
@Get('/sessions')
async getSessions(): Promise<IndexerSession[]> {
return await this.indexerSessionRepository.find({});
}

@UseGuards(JwtOrNativeAdminGuard)
@Post('/sessions')
async addSession(
@Body(
new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }),
)
createSessionDto: CreateSessionDto,
): Promise<IndexerSession> {
try {
return await this.indexerPersistenceService.createIndexerSession(
createSessionDto,
);
} catch (error) {
throw new HttpException(error.message, HttpStatus.BAD_REQUEST);
}
}

@UseGuards(JwtOrNativeAdminGuard)
@Get('/sessions/:nameOrID')
async getSession(
@Param('nameOrID') nameOrID: string,
): Promise<IndexerSession> {
return await this.indexerSessionRepository.findOne(
mongoose.Types.ObjectId.isValid(nameOrID)
? { _id: nameOrID }
: { name: nameOrID },
);
}

@UseGuards(JwtOrNativeAdminGuard)
@Post('/sessions/:nameOrID/abort')
async abortSession(@Param('nameOrID') nameOrID: string): Promise<boolean> {
const session = await this.indexerSessionRepository.findOne(
mongoose.Types.ObjectId.isValid(nameOrID)
? { _id: nameOrID }
: { name: nameOrID },
);

if (!session) {
throw new HttpException('Session not found', HttpStatus.NOT_FOUND);
}

this.cachingService.set(
`indexer.abortSession.${session.name}`,
true,
Constants.oneHour(),
);

return true;
}
}
18 changes: 18 additions & 0 deletions src/modules/analytics-indexer/analytics.indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@ import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery
import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service';
import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service';
import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service';
import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service';
import { IndexerPersistenceService } from './services/indexer.persistence.service';
import { AnalyticsIndexerController } from './analytics.indexer.controller';
import { DatabaseModule } from 'src/services/database/database.module';
import { MongooseModule } from '@nestjs/mongoose';
import {
IndexerSession,
IndexerSessionSchema,
} from './schemas/indexer.session.schema';
import { MXCommunicationModule } from 'src/services/multiversx-communication/mx.communication.module';

@Module({
imports: [
MXCommunicationModule,
PairModule,
RouterModule,
TokenModule,
PriceDiscoveryModule,
ElasticSearchModule,
DatabaseModule,
MongooseModule.forFeature([
{ name: IndexerSession.name, schema: IndexerSessionSchema },
]),
],
providers: [
IndexerService,
Expand All @@ -32,7 +47,10 @@ import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/i
IndexerSwapHandlerService,
IndexerLiquidityHandlerService,
IndexerPriceDiscoveryHandlerService,
IndexerSessionRepositoryService,
IndexerPersistenceService,
],
exports: [IndexerService],
controllers: [AnalyticsIndexerController],
})
export class AnalyticsIndexerModule {}
25 changes: 25 additions & 0 deletions src/modules/analytics-indexer/entities/create.session.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {
ArrayMinSize,
ArrayUnique,
IsArray,
IsEnum,
IsInt,
IsOptional,
IsPositive,
} from 'class-validator';
import { IndexerEventTypes } from './indexer.event.types';

export class CreateSessionDto {
@IsInt()
@IsPositive()
start: number;
@IsOptional()
@IsInt()
@IsPositive()
end?: number;
@IsArray()
@ArrayMinSize(1)
@ArrayUnique()
@IsEnum(IndexerEventTypes, { each: true })
eventTypes: IndexerEventTypes[];
}
52 changes: 52 additions & 0 deletions src/modules/analytics-indexer/schemas/indexer.session.schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { Document } from 'mongoose';
import { IndexerEventTypes } from '../entities/indexer.event.types';

export enum IndexerStatus {
PENDING = 'PENDING',
IN_PROGRESS = 'IN_PROGRESS',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
ABORTED = 'ABORTED',
}

export class IndexerJob {
startTimestamp: number;
endTimestamp: number;
order: number;
status: IndexerStatus;
runAttempts = 0;
errorCount = 0;
durationMs = 0;

constructor(init?: Partial<IndexerJob>) {
Object.assign(this, init);
}
}

export type IndexerSessionDocument = IndexerSession & Document;

@Schema({
collection: 'indexer_sessions',
})
export class IndexerSession {
@Prop({ required: true, unique: true })
name: string;
@Prop({ required: true })
startTimestamp: number;
@Prop({ required: true })
endTimestamp: number;
@Prop(() => [IndexerEventTypes])
eventTypes: IndexerEventTypes[];
@Prop(() => [IndexerJob])
jobs: IndexerJob[];
@Prop(() => IndexerStatus)
status: IndexerStatus;

constructor(init?: Partial<IndexerSession>) {
Object.assign(this, init);
}
}

export const IndexerSessionSchema =
SchemaFactory.createForClass(IndexerSession);
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Inject, Injectable } from '@nestjs/common';
import { WINSTON_MODULE_PROVIDER } from 'nest-winston';
import { Logger } from 'winston';
import {
IndexerJob,
IndexerSession,
IndexerStatus,
} from '../schemas/indexer.session.schema';
import moment from 'moment';
import { Constants } from '@multiversx/sdk-nestjs-common';
import { IndexerSessionRepositoryService } from './indexer.session.repository.service';
import { CreateSessionDto } from '../entities/create.session.dto';

@Injectable()
export class IndexerPersistenceService {
constructor(
private readonly indexerSessionRepository: IndexerSessionRepositoryService,
@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger,
) {}

public async createIndexerSession(
createSessionDto: CreateSessionDto,
): Promise<IndexerSession> {
const { start } = createSessionDto;

const end = !createSessionDto.end
? moment().unix()
: createSessionDto.end;

if (start >= end) {
throw new Error('End timestamp should be after start');
}

const activeSession = await this.getActiveSession();

if (activeSession) {
throw new Error(
'Cannot start a session while another one is in progress.',
);
}

return await this.indexerSessionRepository.create({
name: `Session_${moment().unix()}`,
startTimestamp: start,
endTimestamp: end,
eventTypes: createSessionDto.eventTypes,
jobs: this.createSessionJobs(start, end),
status: IndexerStatus.PENDING,
});
}

public async getActiveSession(): Promise<IndexerSession> {
return await this.indexerSessionRepository.findOne({
status: {
$in: [IndexerStatus.IN_PROGRESS, IndexerStatus.PENDING],
},
});
}

private createSessionJobs(start: number, end: number): IndexerJob[] {
const jobs: IndexerJob[] = [];
const oneWeek = Constants.oneWeek();

let currentStart = start;
let order = 0;

while (currentStart <= end) {
let currentEnd = Math.min(currentStart + oneWeek, end);

// avoid edge case where remaining interval is a single second
if (currentEnd + 1 === end) {
currentEnd -= 1;
}

jobs.push(
new IndexerJob({
startTimestamp: currentStart,
endTimestamp: currentEnd,
order: order,
status: IndexerStatus.PENDING,
}),
);
currentStart = currentEnd + 1;
order += 1;
}

return jobs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { EntityRepository } from 'src/services/database/repositories/entity.repository';
import {
IndexerSession,
IndexerSessionDocument,
} from '../schemas/indexer.session.schema';

@Injectable()
export class IndexerSessionRepositoryService extends EntityRepository<IndexerSessionDocument> {
constructor(
@InjectModel(IndexerSession.name)
private readonly indexerSessionModel: Model<IndexerSessionDocument>,
) {
super(indexerSessionModel);
}
}
2 changes: 2 additions & 0 deletions src/private.app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TokenController } from './modules/tokens/token.controller';
import { TokenModule } from './modules/tokens/token.module';
import { DynamicModuleUtils } from './utils/dynamic.module.utils';
import { ESTransactionsService } from './services/elastic-search/services/es.transactions.service';
import { AnalyticsIndexerModule } from './modules/analytics-indexer/analytics.indexer.module';

@Module({
imports: [
Expand All @@ -16,6 +17,7 @@ import { ESTransactionsService } from './services/elastic-search/services/es.tra
TokenModule,
RemoteConfigModule,
DynamicModuleUtils.getCacheModule(),
AnalyticsIndexerModule,
],
controllers: [MetricsController, TokenController, RemoteConfigController],
providers: [ESTransactionsService],
Expand Down