Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiendan committed Nov 7, 2023
1 parent 2b5c544 commit 7bc694e
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 107 deletions.
11 changes: 6 additions & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ AUTH0_AUDIENCE=
AUTH0_ISSUER_URL=
REDIS_HOST=
REDIS_PORT=
TOPOS_SUBNET_ENDPOINT_WS=
SUBNET_REGISTRATOR_CONTRACT_ADDRESS=
TOPOS_CORE_PROXY_CONTRACT_ADDRESS=
TRACING_SERVICE_NAME=
TRACING_SERVICE_VERSION=
ELASTIC_APM_ENDPOINT=
ELASTIC_APM_TOKEN=
TOPOS_SUBNET_ENDPOINT_WS=

# telemetry
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_SERVICE_NAME=
OTEL_SERVICE_VERSION=
5 changes: 0 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ jobs:

- run: npm ci
- run: npm run test
env:
ELASTIC_APM_ENDPOINT: ${{ secrets.ELASTIC_APM_ENDPOINT }}
ELASTIC_APM_TOKEN: ${{ secrets.ELASTIC_APM_TOKEN }}

e2e-tests:
name: E2E tests
Expand All @@ -45,5 +42,3 @@ jobs:
REDIS_HOST: ${{ vars.REDIS_HOST }}
REDIS_PORT: ${{ vars.REDIS_PORT }}
PRIVATE_KEY: ${{ secrets.PRIVATE_KEY }}
ELASTIC_APM_ENDPOINT: ${{ secrets.ELASTIC_APM_ENDPOINT }}
ELASTIC_APM_TOKEN: ${{ secrets.ELASTIC_APM_TOKEN }}
4 changes: 2 additions & 2 deletions src/execute/execute.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Test, TestingModule } from '@nestjs/testing'
import { Observable } from 'rxjs'

import { ExecuteControllerV1 } from './execute.controller'
import { ExecuteDto } from './execute.dto'
import { QUEUE_ERRORS } from './execute.errors'
import { ExecuteServiceV1 } from './execute.service'
import { Observable } from 'rxjs'

const validExecuteDto: ExecuteDto = {
logIndexes: [],
Expand All @@ -28,7 +28,7 @@ describe('ExecuteController', () => {
return {
execute: jest.fn().mockResolvedValue({}),
getJobById: jest.fn().mockResolvedValue({}),
subscribeToJobById: jest.fn().mockResolvedValue({}),
subscribeToJobById: jest.fn().mockReturnValue({ pipe: jest.fn() }),
}
}
})
Expand Down
7 changes: 4 additions & 3 deletions src/execute/execute.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import {
SpanStatusCode,
trace,
} from '@opentelemetry/api'
import { tap } from 'rxjs'

import { JwtAuthGuard } from '../auth/jwt-auth.guard'
import { getErrorMessage } from '../utils'
import { ExecuteDto } from './execute.dto'
import { ExecuteServiceV1, TracingOptions } from './execute.service'
import { JwtAuthGuard } from '../auth/jwt-auth.guard'
import { finalize, tap } from 'rxjs'
import { getErrorMessage } from 'src/utils'

@Controller({ version: '1' })
export class ExecuteControllerV1 {
Expand Down Expand Up @@ -115,6 +115,7 @@ export class ExecuteControllerV1 {
span.end()
},
complete: () => {
span.setStatus({ code: SpanStatusCode.OK })
span.end()
},
})
Expand Down
39 changes: 19 additions & 20 deletions src/execute/execute.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Job } from 'bull'
import { ethers } from 'ethers'
import { EventEmitter } from 'stream'

import { ApmService } from '../apm/apm.service'
import { ExecuteDto } from './execute.dto'
import { ExecutionProcessorV1 } from './execute.processor'
import { TracingOptions } from './execute.service'
Expand All @@ -15,15 +14,21 @@ const TOPOS_CORE_PROXY_CONTRACT_ADDRESS =
'0x1D7b9f9b1FF6cf0A3BEB0F84fA6F8628E540E97F'
const TOPOS_SUBNET_ENDPOINT_WS = 'ws://topos-subnet-endpoint/ws'

const validExecuteJob: Partial<Job<ExecuteDto & TracingOptions>> = {
const validExecuteJob: Partial<
Job<ExecuteDto & { tracingOptions: TracingOptions }>
> = {
data: {
logIndexes: [],
messagingContractAddress: '',
receiptTrieRoot: '',
receiptTrieMerkleProof: '',
subnetId: 'id',
traceparent: '',
tracingOptions: {
traceparent: '',
tracestate: '',
},
},
moveToFailed: jest.fn(),
progress: jest.fn(),
}

Expand All @@ -37,7 +42,7 @@ const contractMock = {
execute: jest.fn().mockResolvedValue(transactionMock),
networkSubnetId: jest.fn().mockResolvedValue(''),
subnets: jest.fn().mockResolvedValue(subnetMock),
txRootToCertId: jest.fn().mockResolvedValue(''),
receiptRootToCertId: jest.fn().mockResolvedValue(''),
}

describe('ExecuteProcessor', () => {
Expand All @@ -50,8 +55,9 @@ describe('ExecuteProcessor', () => {
})
.useMocker((token) => {
if (token === ConfigService) {
return {
get: jest.fn().mockImplementation((key: string) => {
const configGetterMock = jest
.fn()
.mockImplementation((key: string) => {
switch (key) {
case 'PRIVATE_KEY':
return VALID_PRIVATE_KEY
Expand All @@ -60,19 +66,10 @@ describe('ExecuteProcessor', () => {
case 'TOPOS_SUBNET_ENDPOINT_WS':
return TOPOS_SUBNET_ENDPOINT_WS
}
}),
}
}

if (token === ApmService) {
})
return {
captureError: jest.fn(),
startTransaction: jest.fn().mockReturnValue({
end: jest.fn(),
startSpan: jest
.fn()
.mockReturnValue({ addLabels: jest.fn(), end: jest.fn() }),
}),
get: configGetterMock,
getOrThrow: configGetterMock,
}
}
})
Expand All @@ -94,7 +91,9 @@ describe('ExecuteProcessor', () => {
jest.spyOn<any, any>(ethers, 'Contract').mockReturnValue(contractMock)

await executeProcessor.execute(
validExecuteJob as unknown as Job<ExecuteDto & TracingOptions>
validExecuteJob as unknown as Job<
ExecuteDto & { tracingOptions: TracingOptions }
>
)

expect(ethersProviderMock).toHaveBeenCalledWith(TOPOS_SUBNET_ENDPOINT_WS)
Expand All @@ -104,7 +103,7 @@ describe('ExecuteProcessor', () => {
providerMock
)

expect(contractMock.txRootToCertId).toHaveBeenCalled()
expect(contractMock.receiptRootToCertId).toHaveBeenCalled()

expect(validExecuteJob.progress).toHaveBeenCalledWith(50)

Expand Down
2 changes: 1 addition & 1 deletion src/execute/execute.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
import { Job } from 'bull'
import { Contract, ethers, providers } from 'ethers'

import { getErrorMessage, sanitizeURLProtocol } from '../utils'
import { getErrorMessage } from '../utils'
import { ExecuteDto } from './execute.dto'
import {
CONTRACT_ERRORS,
Expand Down
30 changes: 6 additions & 24 deletions src/execute/execute.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Test, TestingModule } from '@nestjs/testing'
import { Job } from 'bull'
import { first, firstValueFrom, lastValueFrom } from 'rxjs'

import { ApmService } from '../apm/apm.service'
import { ExecuteDto } from './execute.dto'
import { ExecuteServiceV1, TracingOptions } from './execute.service'

Expand All @@ -18,6 +17,7 @@ const validExecuteDto: ExecuteDto = {

const validTracingOptions: TracingOptions = {
traceparent: '',
tracestate: '',
}

const VALID_PRIVATE_KEY =
Expand Down Expand Up @@ -67,18 +67,6 @@ describe('ExecuteService', () => {
}),
}
}

if (token === ApmService) {
return {
captureError: jest.fn(),
startTransaction: jest.fn().mockReturnValue({
end: jest.fn(),
startSpan: jest
.fn()
.mockReturnValue({ addLabels: jest.fn(), end: jest.fn() }),
}),
}
}
})
.overrideProvider(getQueueToken('execute'))
.useValue(executeQueueMock)
Expand All @@ -93,7 +81,7 @@ describe('ExecuteService', () => {

expect(executeQueueMock.add).toHaveBeenCalledWith('execute', {
...validExecuteDto,
...validTracingOptions,
tracingOptions: validTracingOptions,
})
})
})
Expand All @@ -112,7 +100,7 @@ describe('ExecuteService', () => {
it('should retrieve the correct job', () => {
const jobId = '1'
executeService
.subscribeToJobById(jobId, validTracingOptions)
.subscribeToJobById(jobId)
.pipe(first())
.subscribe(() => {
expect(executeQueueMock.getJob).toHaveBeenCalledWith(jobId)
Expand All @@ -123,9 +111,7 @@ describe('ExecuteService', () => {
it('should first next some progress', async () => {
const jobId = '1'
await expect(
firstValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
firstValueFrom(executeService.subscribeToJobById(jobId))
).resolves.toStrictEqual({
data: { payload: '', type: 'progress' },
})
Expand All @@ -134,9 +120,7 @@ describe('ExecuteService', () => {
it('should then complete', async () => {
const jobId = '1'
await expect(
lastValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
lastValueFrom(executeService.subscribeToJobById(jobId))
).resolves.toStrictEqual({
data: { payload: {}, type: 'completed' },
})
Expand All @@ -158,9 +142,7 @@ describe('ExecuteService', () => {
)

await expect(
lastValueFrom(
executeService.subscribeToJobById(jobId, validTracingOptions)
)
lastValueFrom(executeService.subscribeToJobById(jobId))
).rejects.toStrictEqual('errorMock')
})
})
Expand Down
93 changes: 51 additions & 42 deletions src/execute/execute.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Observable } from 'rxjs'

import { ExecuteDto } from './execute.dto'
import { QUEUE_ERRORS, WALLET_ERRORS } from './execute.errors'
import { getErrorMessage } from 'src/utils'
import { getErrorMessage } from '../utils'

export interface TracingOptions {
traceparent: string
Expand Down Expand Up @@ -71,11 +71,15 @@ export class ExecuteServiceV1 {
throw new Error(QUEUE_ERRORS.JOB_NOT_FOUND)
}

span.setStatus({ code: SpanStatusCode.ERROR })
span.setStatus({
code: SpanStatusCode.ERROR,
message: failedJob.failedReason,
})
span.end()
return failedJob
}

span.setAttribute('job', JSON.stringify(job))
span.setStatus({ code: SpanStatusCode.OK })
span.end()
return job
Expand All @@ -87,47 +91,52 @@ export class ExecuteServiceV1 {
return new Observable<MessageEvent>((subscriber) => {
span.setAttribute('jobId', jobId)

this.getJobById(jobId)
.then((job) => {
const progressListener = (job, progress) => {
if (job.id === jobId) {
this.logger.debug(`Job progress: ${progress}`)
span.addEvent('got progress update', { progress })
subscriber.next({
data: { payload: progress, type: 'progress' },
})
context.with(trace.setSpan(context.active(), span), async () => {
this.getJobById(jobId)
.then((job) => {
const progressListener = (job, progress) => {
if (job.id === jobId) {
this.logger.debug(`Job progress: ${progress}`)
span.addEvent('got progress update', { progress })
subscriber.next({
data: { payload: progress, type: 'progress' },
})
}
}
}

this.executionQueue.on('progress', progressListener)
job
.finished()
.then((payload) => {
this.logger.debug(`Job completed!`)
this.executionQueue.removeListener('progress', progressListener)
span.setStatus({ code: SpanStatusCode.OK })
subscriber.next({ data: { payload, type: 'completed' } })
subscriber.complete()
})
.catch((error) => {
this.logger.debug(`Job failed!`)
this.logger.debug(error)
span.setStatus({ code: SpanStatusCode.ERROR, message: error })
subscriber.error(error)
subscriber.complete()
})
.finally(() => {
span.end()
})
})
.catch((error) => {
this.logger.debug(`Job not found!`)
this.logger.debug(error)
span.setStatus({ code: SpanStatusCode.ERROR, message: error })
span.end()
subscriber.error(error)
subscriber.complete()
})

this.executionQueue.on('progress', progressListener)
job
.finished()
.then((payload) => {
this.logger.debug(`Job completed!`)
this.executionQueue.removeListener(
'progress',
progressListener
)
span.setStatus({ code: SpanStatusCode.OK })
subscriber.next({ data: { payload, type: 'completed' } })
subscriber.complete()
})
.catch((error) => {
this.logger.debug(`Job failed!`)
this.logger.debug(error)
span.setStatus({ code: SpanStatusCode.ERROR, message: error })
subscriber.error(error)
subscriber.complete()
})
.finally(() => {
span.end()
})
})
.catch((error) => {
this.logger.debug(`Job not found!`)
this.logger.debug(error)
span.setStatus({ code: SpanStatusCode.ERROR, message: error })
span.end()
subscriber.error(error)
subscriber.complete()
})
})
})
})
}
Expand Down
1 change: 0 additions & 1 deletion src/telemetry/telemetry.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export class TelemetryService {
exporter: new OTLPMetricExporter({
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics`,
}),
exportIntervalMillis: 5000,
}),
instrumentations: [],
})
Expand Down
Loading

0 comments on commit 7bc694e

Please sign in to comment.