Skip to content

Commit

Permalink
feat: use elastic apm instrumentation (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiendan committed Sep 15, 2023
1 parent 0394885 commit 1bf66b1
Show file tree
Hide file tree
Showing 17 changed files with 2,724 additions and 1,458 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ REDIS_PORT=
TOPOS_SUBNET_ENDPOINT=
SUBNET_REGISTRATOR_CONTRACT_ADDRESS=
TOPOS_CORE_PROXY_CONTRACT_ADDRESS=
TRACING_OTEL_COLLECTOR_ENDPOINT=
TRACING_SERVICE_NAME=
TRACING_SERVICE_VERSION=
ELASTIC_APM_ENDPOINT=
ELASTIC_APM_TOKEN=
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ jobs:

- run: npm ci
- run: npm run test
env:
ELASTIC_APM_ENDPOINT: ${{ secrets.ELASTIC_APM_ENDPOINT }}
ELASTIC_APM_TOKEN: ${{ secrets.ELASTIC_APM_TOKEN }}

e2e-tests:
name: E2E tests
Expand All @@ -42,3 +45,5 @@ jobs:
REDIS_HOST: ${{ vars.REDIS_HOST }}
REDIS_PORT: ${{ vars.REDIS_PORT }}
PRIVATE_KEY: ${{ secrets.PRIVATE_KEY }}
ELASTIC_APM_ENDPOINT: ${{ secrets.ELASTIC_APM_ENDPOINT }}
ELASTIC_APM_TOKEN: ${{ secrets.ELASTIC_APM_TOKEN }}
3,867 changes: 2,506 additions & 1,361 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 2 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,12 @@
"@nestjs/passport": "^9.0.0",
"@nestjs/platform-express": "^9.4.0",
"@nestjs/swagger": "^6.1.2",
"@opentelemetry/exporter-trace-otlp-proto": "^0.41.1",
"@opentelemetry/instrumentation-express": "^0.33.0",
"@opentelemetry/instrumentation-http": "^0.41.1",
"@opentelemetry/instrumentation-nestjs-core": "^0.33.0",
"@opentelemetry/resources": "^1.15.1",
"@opentelemetry/sdk-node": "^0.41.1",
"@opentelemetry/sdk-trace-base": "^1.15.1",
"@opentelemetry/semantic-conventions": "^1.15.1",
"@topos-protocol/topos-smart-contracts": "^1.2.0",
"@topos-protocol/topos-smart-contracts": "^1.2.3",
"bcrypt": "^5.1.0",
"bull": "^4.10.1",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"elastic-apm-node": "^4.0.0",
"ethers": "^5.7.1",
"jwks-rsa": "^2.1.5",
"passport": "^0.6.0",
Expand Down
11 changes: 11 additions & 0 deletions src/apm/apm.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Module } from '@nestjs/common'
import { ConfigModule } from '@nestjs/config'

import { ApmService } from './apm.service'

@Module({
imports: [ConfigModule],
exports: [ApmService],
providers: [ApmService],
})
export class ApmModule {}
41 changes: 41 additions & 0 deletions src/apm/apm.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Injectable } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import * as ElasticAPM from 'elastic-apm-node'

export const SERVICE_NAME =
process.env.TRACING_SERVICE_NAME || 'executor-service'
export const SERVICE_VERSION = process.env.TRACING_SERVICE_VERSION || 'unknown'
export const ELASTIC_APM_ENDPOINT = process.env.ELASTIC_APM_ENDPOINT || ''
export const ELASTIC_APM_TOKEN = process.env.ELASTIC_APM_TOKEN || ''

@Injectable()
export class ApmService {
private _apm: ElasticAPM.Agent

constructor(private configService: ConfigService) {
this._apm = ElasticAPM.start({
serviceName:
this.configService.get('TRACING_SERVICE_NAME') || 'executor-service',
secretToken: this.configService.get('ELASTIC_APM_TOKEN') || '',
serverUrl: this.configService.get('ELASTIC_APM_ENDPOINT') || '',
environment: this.configService.get('SERVICE_VERSION') || 'unknown',
opentelemetryBridgeEnabled: true,
captureBody: 'all',
})
}

startTransaction(name: string, traceparent?: string) {
return this._apm.startTransaction(
name,
traceparent
? {
childOf: traceparent,
}
: undefined
)
}

captureError(error: string) {
this._apm.captureError(error)
}
}
2 changes: 0 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { BullModule } from '@nestjs/bull'

import { AuthModule } from './auth/auth.module'
import { ExecuteModuleV1 } from './execute/execute.module'

@Module({
imports: [
ConfigModule.forRoot(),
AuthModule,
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
Expand Down
2 changes: 1 addition & 1 deletion src/execute/execute.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ExecuteControllerV1 } from './execute.controller'
import { ExecuteDto } from './execute.dto'
import { QUEUE_ERRORS } from './execute.errors'
import { ExecuteServiceV1 } from './execute.service'
import { Observable, observable } from 'rxjs'
import { Observable } from 'rxjs'

const validExecuteDto: ExecuteDto = {
logIndexes: [],
Expand Down
25 changes: 20 additions & 5 deletions src/execute/execute.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Body,
Controller,
Get,
Headers,
Param,
Post,
Sse,
Expand All @@ -11,7 +12,7 @@ import {
import { ApiBearerAuth, ApiParam, ApiTags } from '@nestjs/swagger'

import { ExecuteDto } from './execute.dto'
import { ExecuteServiceV1 } from './execute.service'
import { ExecuteServiceV1, TracingOptions } from './execute.service'
import { JwtAuthGuard } from '../auth/jwt-auth.guard'

@Controller({ version: '1' })
Expand All @@ -22,8 +23,15 @@ export class ExecuteControllerV1 {
@Post('execute')
@ApiBearerAuth()
@UseGuards(JwtAuthGuard)
async executeV1(@Body() executeDto: ExecuteDto) {
return this.executeService.execute(executeDto).catch((error) => {
async executeV1(
@Body() executeDto: ExecuteDto,
@Headers('traceparent') traceparent?: string
) {
const args: [ExecuteDto, TracingOptions?] = [executeDto]
if (traceparent) {
args.push({ traceparent })
}
return this.executeService.execute(...args).catch((error) => {
throw new BadRequestException(error.message)
})
}
Expand All @@ -42,7 +50,14 @@ export class ExecuteControllerV1 {
@ApiBearerAuth()
@UseGuards(JwtAuthGuard)
@ApiParam({ name: 'jobId' })
async subscribeToJob(@Param('jobId') jobId: string) {
return this.executeService.subscribeToJobById(jobId)
async subscribeToJob(
@Param('jobId') jobId: string,
@Headers('traceparent') traceparent?: string
) {
const args: [string, TracingOptions?] = [jobId]
if (traceparent) {
args.push({ traceparent })
}
return this.executeService.subscribeToJobById(...args)
}
}
8 changes: 5 additions & 3 deletions src/execute/execute.module.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { BullModule } from '@nestjs/bull'
import { Module } from '@nestjs/common'
import { ConfigModule } from '@nestjs/config'
import { ConfigService } from '@nestjs/config'

import { ApmModule } from '../apm/apm.module'
import { AuthModule } from '../auth/auth.module'
import { ExecuteControllerV1 } from './execute.controller'
import { ExecutionProcessorV1 } from './execute.processor'
import { ExecuteServiceV1 } from './execute.service'

@Module({
controllers: [ExecuteControllerV1],
imports: [ConfigModule, BullModule.registerQueue({ name: 'execute' })],
providers: [ExecutionProcessorV1, ExecuteServiceV1],
imports: [BullModule.registerQueue({ name: 'execute' }), ApmModule, AuthModule],
providers: [ExecutionProcessorV1, ExecuteServiceV1, ConfigService],
})
export class ExecuteModuleV1 {}
19 changes: 17 additions & 2 deletions src/execute/execute.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ import { Job } from 'bull'
import { ethers } from 'ethers'
import { EventEmitter } from 'stream'

import { ApmService } from '../apm/apm.service'
import { ExecuteDto } from './execute.dto'
import { ExecutionProcessorV1 } from './execute.processor'
import { TracingOptions } from './execute.service'

const VALID_PRIVATE_KEY =
'0xc6cbd7d76bc5baca530c875663711b947efa6a86a900a9e8645ce32e5821484e'
const TOPOS_CORE_PROXY_CONTRACT_ADDRESS =
'0x1D7b9f9b1FF6cf0A3BEB0F84fA6F8628E540E97F'
const TOPOS_SUBNET_ENDPOINT = 'topos-subnet-endpoint'

const validExecuteJob: Partial<Job<ExecuteDto>> = {
const validExecuteJob: Partial<Job<ExecuteDto & TracingOptions>> = {
data: {
logIndexes: [],
messagingContractAddress: '',
receiptTrieRoot: '',
receiptTrieMerkleProof: '',
subnetId: 'id',
traceparent: '',
},
progress: jest.fn(),
}
Expand Down Expand Up @@ -60,6 +63,18 @@ describe('ExecuteProcessor', () => {
}),
}
}

if (token === ApmService) {
return {
captureError: jest.fn(),
startTransaction: jest.fn().mockReturnValue({
end: jest.fn(),
startSpan: jest
.fn()
.mockReturnValue({ addLabels: jest.fn(), end: jest.fn() }),
}),
}
}
})
.compile()

Expand All @@ -79,7 +94,7 @@ describe('ExecuteProcessor', () => {
jest.spyOn<any, any>(ethers, 'Contract').mockReturnValue(contractMock)

await executeProcessor.execute(
validExecuteJob as unknown as Job<ExecuteDto>
validExecuteJob as unknown as Job<ExecuteDto & TracingOptions>
)

expect(ethersProviderMock).toHaveBeenCalledWith(
Expand Down
34 changes: 29 additions & 5 deletions src/execute/execute.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import {
import { Job } from 'bull'
import { ethers, providers } from 'ethers'

import { ApmService } from '../apm/apm.service'
import { sanitizeURLProtocol } from '../utils'
import { ExecuteDto } from './execute.dto'
import {
CONTRACT_ERRORS,
JOB_ERRORS,
PROVIDER_ERRORS,
WALLET_ERRORS,
} from './execute.errors'
import { sanitizeURLProtocol } from '../utils'
import { TracingOptions } from './execute.service'

const UNDEFINED_CERTIFICATE_ID =
'0x0000000000000000000000000000000000000000000000000000000000000000'
Expand All @@ -38,33 +40,47 @@ const UNDEFINED_CERTIFICATE_ID =
export class ExecutionProcessorV1 {
private readonly logger = new Logger(ExecutionProcessorV1.name)

constructor(private configService: ConfigService) {}
constructor(
private configService: ConfigService,
private apmService: ApmService
) {}

@Process('execute')
async execute(job: Job<ExecuteDto>) {
async execute(job: Job<ExecuteDto & TracingOptions>) {
const {
logIndexes,
messagingContractAddress,
receiptTrieMerkleProof,
receiptTrieRoot,
subnetId,
traceparent,
} = job.data

const toposCoreContractAddress = this.configService.get<string>(
const apmTransaction = this.apmService.startTransaction(
'root-processor',
traceparent
)
const executeSpan = apmTransaction.startSpan(`execute`)
executeSpan.addLabels({ data: JSON.stringify(job.data) })

const toposCoreProxyContractAddress = this.configService.get<string>(
'TOPOS_CORE_PROXY_CONTRACT_ADDRESS'
)
executeSpan.addLabels({ toposCoreProxyContractAddress })

const receivingSubnetEndpoint =
await this._getReceivingSubnetEndpointFromId(subnetId)
executeSpan.addLabels({ receivingSubnetEndpoint })

const provider = await this._createProvider(receivingSubnetEndpoint)
this.logger.debug(`ReceivingSubnet: ${receivingSubnetEndpoint}`)
executeSpan.addLabels({ provider: JSON.stringify(provider) })

const wallet = this._createWallet(provider)

const toposCoreContract = (await this._getContract(
provider,
toposCoreContractAddress,
toposCoreProxyContractAddress,
ToposCoreJSON.abi,
wallet
)) as ToposCore
Expand All @@ -91,9 +107,12 @@ export class ExecutionProcessorV1 {

if (certId == UNDEFINED_CERTIFICATE_ID) {
await job.moveToFailed({ message: JOB_ERRORS.MISSING_CERTIFICATE })
this.apmService.captureError(JOB_ERRORS.MISSING_CERTIFICATE)
return
}

executeSpan.addLabels({ certId })

await job.progress(50)

const tx = await messagingContract.execute(
Expand All @@ -105,7 +124,12 @@ export class ExecutionProcessorV1 {
}
)

executeSpan.addLabels({ tx: JSON.stringify(tx) })

return tx.wait().then(async (receipt) => {
executeSpan.addLabels({ receipt: JSON.stringify(receipt) })
executeSpan.end()
apmTransaction.end()
await job.progress(100)
return receipt
})
Expand Down
Loading

0 comments on commit 1bf66b1

Please sign in to comment.