Skip to content

Commit

Permalink
feat: use elastic apm instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiendan committed Sep 14, 2023
1 parent 0394885 commit 2cd0200
Show file tree
Hide file tree
Showing 11 changed files with 2,621 additions and 1,443 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ REDIS_PORT=
TOPOS_SUBNET_ENDPOINT=
SUBNET_REGISTRATOR_CONTRACT_ADDRESS=
TOPOS_CORE_PROXY_CONTRACT_ADDRESS=
TRACING_OTEL_COLLECTOR_ENDPOINT=
TRACING_SERVICE_NAME=
TRACING_SERVICE_VERSION=
ELASTIC_APM_ENDPOINT=
ELASTIC_APM_TOKEN=
3,867 changes: 2,506 additions & 1,361 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 2 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,12 @@
"@nestjs/passport": "^9.0.0",
"@nestjs/platform-express": "^9.4.0",
"@nestjs/swagger": "^6.1.2",
"@opentelemetry/exporter-trace-otlp-proto": "^0.41.1",
"@opentelemetry/instrumentation-express": "^0.33.0",
"@opentelemetry/instrumentation-http": "^0.41.1",
"@opentelemetry/instrumentation-nestjs-core": "^0.33.0",
"@opentelemetry/resources": "^1.15.1",
"@opentelemetry/sdk-node": "^0.41.1",
"@opentelemetry/sdk-trace-base": "^1.15.1",
"@opentelemetry/semantic-conventions": "^1.15.1",
"@topos-protocol/topos-smart-contracts": "^1.2.0",
"@topos-protocol/topos-smart-contracts": "^1.2.2",
"bcrypt": "^5.1.0",
"bull": "^4.10.1",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"elastic-apm-node": "^4.0.0",
"ethers": "^5.7.1",
"jwks-rsa": "^2.1.5",
"passport": "^0.6.0",
Expand Down
2 changes: 1 addition & 1 deletion src/execute/execute.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ExecuteControllerV1 } from './execute.controller'
import { ExecuteDto } from './execute.dto'
import { QUEUE_ERRORS } from './execute.errors'
import { ExecuteServiceV1 } from './execute.service'
import { Observable, observable } from 'rxjs'
import { Observable } from 'rxjs'

const validExecuteDto: ExecuteDto = {
logIndexes: [],
Expand Down
21 changes: 15 additions & 6 deletions src/execute/execute.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Body,
Controller,
Get,
Headers,
Param,
Post,
Sse,
Expand All @@ -22,10 +23,15 @@ export class ExecuteControllerV1 {
@Post('execute')
@ApiBearerAuth()
@UseGuards(JwtAuthGuard)
async executeV1(@Body() executeDto: ExecuteDto) {
return this.executeService.execute(executeDto).catch((error) => {
throw new BadRequestException(error.message)
})
async executeV1(
@Body() executeDto: ExecuteDto,
@Headers('traceparent') traceparent?: string
) {
return this.executeService
.execute(executeDto, { traceparent })
.catch((error) => {
throw new BadRequestException(error.message)
})
}

@ApiTags('job')
Expand All @@ -42,7 +48,10 @@ export class ExecuteControllerV1 {
@ApiBearerAuth()
@UseGuards(JwtAuthGuard)
@ApiParam({ name: 'jobId' })
async subscribeToJob(@Param('jobId') jobId: string) {
return this.executeService.subscribeToJobById(jobId)
async subscribeToJob(
@Param('jobId') jobId: string,
@Headers('traceparent') traceparent?: string
) {
return this.executeService.subscribeToJobById(jobId, { traceparent })
}
}
6 changes: 4 additions & 2 deletions src/execute/execute.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ import { EventEmitter } from 'stream'

import { ExecuteDto } from './execute.dto'
import { ExecutionProcessorV1 } from './execute.processor'
import { TracingOptions } from './execute.service'

const VALID_PRIVATE_KEY =
'0xc6cbd7d76bc5baca530c875663711b947efa6a86a900a9e8645ce32e5821484e'
const TOPOS_CORE_PROXY_CONTRACT_ADDRESS =
'0x1D7b9f9b1FF6cf0A3BEB0F84fA6F8628E540E97F'
const TOPOS_SUBNET_ENDPOINT = 'topos-subnet-endpoint'

const validExecuteJob: Partial<Job<ExecuteDto>> = {
const validExecuteJob: Partial<Job<ExecuteDto & TracingOptions>> = {
data: {
logIndexes: [],
messagingContractAddress: '',
receiptTrieRoot: '',
receiptTrieMerkleProof: '',
subnetId: 'id',
traceparent: '',
},
progress: jest.fn(),
}
Expand Down Expand Up @@ -79,7 +81,7 @@ describe('ExecuteProcessor', () => {
jest.spyOn<any, any>(ethers, 'Contract').mockReturnValue(contractMock)

await executeProcessor.execute(
validExecuteJob as unknown as Job<ExecuteDto>
validExecuteJob as unknown as Job<ExecuteDto & TracingOptions>
)

expect(ethersProviderMock).toHaveBeenCalledWith(
Expand Down
24 changes: 22 additions & 2 deletions src/execute/execute.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import {
import { Job } from 'bull'
import { ethers, providers } from 'ethers'

import { apm } from '../main'
import { sanitizeURLProtocol } from '../utils'
import { ExecuteDto } from './execute.dto'
import {
CONTRACT_ERRORS,
JOB_ERRORS,
PROVIDER_ERRORS,
WALLET_ERRORS,
} from './execute.errors'
import { sanitizeURLProtocol } from '../utils'
import { TracingOptions } from './execute.service'

const UNDEFINED_CERTIFICATE_ID =
'0x0000000000000000000000000000000000000000000000000000000000000000'
Expand All @@ -41,24 +43,34 @@ export class ExecutionProcessorV1 {
constructor(private configService: ConfigService) {}

@Process('execute')
async execute(job: Job<ExecuteDto>) {
async execute(job: Job<ExecuteDto & TracingOptions>) {
const {
logIndexes,
messagingContractAddress,
receiptTrieMerkleProof,
receiptTrieRoot,
subnetId,
traceparent,
} = job.data

const apmTransaction = apm.startTransaction('root-processor', {
childOf: traceparent,
})
const executeSpan = apmTransaction.startSpan(`execute`)
executeSpan.addLabels({ data: JSON.stringify(job.data) })

const toposCoreContractAddress = this.configService.get<string>(
'TOPOS_CORE_PROXY_CONTRACT_ADDRESS'
)
executeSpan.addLabels({ toposCoreContractAddress })

const receivingSubnetEndpoint =
await this._getReceivingSubnetEndpointFromId(subnetId)
executeSpan.addLabels({ receivingSubnetEndpoint })

const provider = await this._createProvider(receivingSubnetEndpoint)
this.logger.debug(`ReceivingSubnet: ${receivingSubnetEndpoint}`)
executeSpan.addLabels({ provider: JSON.stringify(provider) })

const wallet = this._createWallet(provider)

Expand Down Expand Up @@ -91,9 +103,12 @@ export class ExecutionProcessorV1 {

if (certId == UNDEFINED_CERTIFICATE_ID) {
await job.moveToFailed({ message: JOB_ERRORS.MISSING_CERTIFICATE })
apm.captureError(JOB_ERRORS.MISSING_CERTIFICATE)
return
}

executeSpan.addLabels({ certId })

await job.progress(50)

const tx = await messagingContract.execute(
Expand All @@ -105,7 +120,12 @@ export class ExecutionProcessorV1 {
}
)

executeSpan.addLabels({ tx: JSON.stringify(tx) })

return tx.wait().then(async (receipt) => {
executeSpan.addLabels({ receipt: JSON.stringify(receipt) })
executeSpan.end()
apmTransaction.end()
await job.progress(100)
return receipt
})
Expand Down
22 changes: 16 additions & 6 deletions src/execute/execute.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing'
import { Job } from 'bull'

import { ExecuteDto } from './execute.dto'
import { ExecuteServiceV1 } from './execute.service'
import { ExecuteServiceV1, TracingOptions } from './execute.service'
import { ConfigService } from '@nestjs/config'
import { first, firstValueFrom, lastValueFrom } from 'rxjs'

Expand All @@ -15,6 +15,10 @@ const validExecuteDto: ExecuteDto = {
subnetId: '',
}

const validTracingOptions: TracingOptions = {
traceparent: '',
}

const VALID_PRIVATE_KEY =
'0xc6cbd7d76bc5baca530c875663711b947efa6a86a900a9e8645ce32e5821484e'

Expand Down Expand Up @@ -72,7 +76,7 @@ describe('ExecuteService', () => {

describe('execute', () => {
it('should call queue.add', () => {
executeService.execute(validExecuteDto)
executeService.execute(validExecuteDto, validTracingOptions)

expect(executeQueueMock.add).toHaveBeenCalledWith(
'execute',
Expand All @@ -95,7 +99,7 @@ describe('ExecuteService', () => {
it('should retrieve the correct job', () => {
const jobId = '1'
executeService
.subscribeToJobById(jobId)
.subscribeToJobById(jobId, validTracingOptions)
.pipe(first())
.subscribe(() => {
expect(executeQueueMock.getJob).toHaveBeenCalledWith(jobId)
Expand All @@ -106,7 +110,9 @@ describe('ExecuteService', () => {
it('should first next some progress', async () => {
const jobId = '1'
await expect(
firstValueFrom(executeService.subscribeToJobById(jobId))
firstValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
).resolves.toStrictEqual({
data: { payload: '', type: 'progress' },
})
Expand All @@ -115,7 +121,9 @@ describe('ExecuteService', () => {
it('should then complete', async () => {
const jobId = '1'
await expect(
lastValueFrom(executeService.subscribeToJobById(jobId))
lastValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
).resolves.toStrictEqual({
data: { payload: {}, type: 'completed' },
})
Expand All @@ -137,7 +145,9 @@ describe('ExecuteService', () => {
)

await expect(
lastValueFrom(executeService.subscribeToJobById(jobId))
lastValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
).rejects.toStrictEqual('errorMock')
})
})
Expand Down
40 changes: 35 additions & 5 deletions src/execute/execute.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import { Queue } from 'bull'
import { ethers } from 'ethers'
import { Observable } from 'rxjs'

import { apm } from '../main'
import { ExecuteDto } from './execute.dto'
import { QUEUE_ERRORS, WALLET_ERRORS } from './execute.errors'

export interface TracingOptions {
traceparent: string
}

@Injectable()
export class ExecuteServiceV1 {
private readonly logger = new Logger(ExecuteServiceV1.name)
Expand All @@ -20,8 +25,19 @@ export class ExecuteServiceV1 {
this._verifyRedisAvailability()
}

async execute(executeDto: ExecuteDto) {
const { id, timestamp, ...rest } = await this._addExecutionJob(executeDto)
async execute(executeDto: ExecuteDto, { traceparent }: TracingOptions) {
const apmTransaction = apm.startTransaction('root-execute', {
childOf: traceparent,
})
const span = apmTransaction.startSpan(`add-execution-job`)

const { id, timestamp, ...rest } = await this._addExecutionJob(executeDto, {
traceparent,
})
span.addLabels({ id, timestamp })

span.end()
apmTransaction.end()
return { id, timestamp }
}

Expand All @@ -43,8 +59,14 @@ export class ExecuteServiceV1 {
return job
}

subscribeToJobById(jobId: string) {
subscribeToJobById(jobId: string, { traceparent }: TracingOptions) {
return new Observable<MessageEvent>((subscriber) => {
const apmTransaction = apm.startTransaction('root-subscribe', {
childOf: traceparent,
})
const span = apmTransaction.startSpan(`subscribe-to-job`)
span.addLabels({ jobId })

this.getJobById(jobId)
.then((job) => {
const progressListener = (job, progress) => {
Expand All @@ -64,13 +86,18 @@ export class ExecuteServiceV1 {
subscriber.complete()
})
.catch((error) => {
apm.captureError(error)
this.logger.debug(`Job failed!`)
this.logger.debug(error)
subscriber.error(error)
subscriber.complete()
})
.finally(() => {
span.end()
})
})
.catch((error) => {
apm.captureError(error)
this.logger.debug(`Job not found!`)
this.logger.debug(error)
subscriber.error(error)
Expand All @@ -79,9 +106,12 @@ export class ExecuteServiceV1 {
})
}

private async _addExecutionJob(executeDto: ExecuteDto) {
private async _addExecutionJob(
executeDto: ExecuteDto,
{ traceparent }: TracingOptions
) {
try {
return this.executionQueue.add('execute', executeDto)
return this.executionQueue.add('execute', { ...executeDto, traceparent })
} catch (error) {
this.logger.error(error)
}
Expand Down
26 changes: 16 additions & 10 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
import { ValidationPipe } from '@nestjs/common'
import { HttpAdapterHost, NestFactory } from '@nestjs/core'
import { NestFactory } from '@nestjs/core'
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'
import * as ElasticAPM from 'elastic-apm-node'

import { AppModule } from './app.module'
import { ExecuteModuleV1 } from './execute/execute.module'
import { AllExceptionsFilter } from './filters/all-exceptions.filter'
import { HttpExceptionFilter } from './filters/http-exception.filter'
import { otelSDK } from './tracing'

export const SERVICE_NAME = process.env.TRACING_SERVICE_NAME || 'executor-service'
export const SERVICE_VERSION = process.env.TRACING_SERVICE_VERSION || 'unknown'
export const ELASTIC_APM_ENDPOINT = process.env.ELASTIC_APM_ENDPOINT || ''
export const ELASTIC_APM_TOKEN = process.env.ELASTIC_APM_TOKEN || ''

export const apm = ElasticAPM.start({
serviceName: SERVICE_NAME,
secretToken: ELASTIC_APM_TOKEN,
serverUrl: ELASTIC_APM_ENDPOINT,
environment: 'local',
opentelemetryBridgeEnabled: true,
captureBody: 'all',
})

async function bootstrap() {
const app = await NestFactory.create(AppModule)
app.enableVersioning()
app.useGlobalPipes(new ValidationPipe())
app.enableCors()

await otelSDK.start()

const httpAdapter = app.get(HttpAdapterHost)
// app.useGlobalFilters(new HttpExceptionFilter())
// app.useGlobalFilters(new AllExceptionsFilter(httpAdapter))

const config = new DocumentBuilder()
.setTitle('Topos Executor Service')
.setDescription('The Topos Executor Service API description')
Expand Down
Loading

0 comments on commit 2cd0200

Please sign in to comment.