Skip to content

Commit

Permalink
feat: add PG instrumentation and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraEr committed Jan 27, 2025
1 parent 2341504 commit e5c4749
Show file tree
Hide file tree
Showing 8 changed files with 1,256 additions and 6 deletions.
25 changes: 25 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,28 @@ services:
environment:
POSTGRES_PASSWORD: postgres
##############################################################################
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
restart: "no"
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
restart: "no"
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
##############################################################################
8 changes: 8 additions & 0 deletions packages/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,27 @@
},
"dependencies": {
"@byndyusoft/class-validator-extended": "1.0.1",
"@byndyusoft/nest-kafka": "2.4.1",
"@byndyusoft/nest-opentracing": "2.6.0",
"@byndyusoft/nest-pino": "3.1.2-1",
"@byndyusoft/nest-swagger": "6.3.0-1",
"@byndyusoft/pino-logger-factory": "3.0.1",
"@digikare/nestjs-prom": "1.0.0",
"@kafkajs/confluent-schema-registry": "3.6.1",
"@nestjs/axios": "0.1.0",
"@nestjs/common": "9.3.12",
"@nestjs/core": "9.3.12",
"@nestjs/microservices": "9.3.12",
"@nestjs/platform-express": "9.3.12",
"@nestjs/terminus": "9.2.2",
"@nestjs/typeorm": "9.0.1",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/context-async-hooks": "1.30.1",
"@opentelemetry/core": "1.30.1",
"@opentelemetry/exporter-trace-otlp-http": "0.57.1",
"@opentelemetry/instrumentation-express": "0.47.0",
"@opentelemetry/instrumentation-http": "0.57.1",
"@opentelemetry/instrumentation-kafkajs": "0.7.0",
"@opentelemetry/instrumentation-pg": "0.50.0",
"@opentelemetry/instrumentation-pino": "0.46.0",
"@opentelemetry/propagator-jaeger": "1.30.1",
Expand All @@ -97,6 +103,8 @@
"class-transformer": "0.5.1",
"class-validator": "0.14.0",
"helmet": "6.0.1",
"jaeger-client": "3.19.0",
"kafkajs": "2.2.4",
"lodash": "4.17.21",
"nestjs-otel": "6.1.2",
"open-telemetry-example-dtos": "workspace:*",
Expand Down
45 changes: 45 additions & 0 deletions packages/app/src/infrastructure/infrastructureModule.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { KafkaModule } from "@byndyusoft/nest-kafka";
import {
OpenTracingModule,
TracedHttpModule,
} from "@byndyusoft/nest-opentracing";
import { Logger } from "@byndyusoft/nest-pino";
import { ApiTags } from "@byndyusoft/nest-swagger";
import { PromController, PromModule } from "@digikare/nestjs-prom";
import { Module } from "@nestjs/common";
import { initTracerFromEnv } from "jaeger-client";
import { OpenTelemetryModule } from "nestjs-otel";

import { AboutModule } from "./about/aboutModule";
Expand All @@ -12,6 +18,8 @@ import { HealthCheckModule } from "./healthCheck/healthCheckModule";
import { LoggerModule } from "./logger/loggerModule";
import { PackageJsonModule } from "./packageJson/packageJsonModule";
import { PgModule } from "./pg/pgModule";
import { ConfigDto } from "./config";
import { PackageJsonDto } from "./packageJson";

ApiTags("Infrastructure")(PromController);

Expand All @@ -20,6 +28,31 @@ ApiTags("Infrastructure")(PromController);
OpenTelemetryModule.forRoot(),
// Initial modules
ConfigModule.forRoot(),
// @byndyusoft/nest-opentracing
OpenTracingModule.forRootAsync({
inject: [ConfigDto, PackageJsonDto],
useFactory: (configDto: ConfigDto, packageJson: PackageJsonDto) => ({
tracer: initTracerFromEnv(
{
serviceName: packageJson.name,
},
{
tags: {
version: packageJson.version,
env: configDto.configEnv,
},
},
),
applyRoutes: ["/api/*"],
ignoreRoutes: [],
logBodies: true,
}),
}),
TracedHttpModule.registerAsync({
useFactory: () => ({
logBodies: true,
}),
}),
PackageJsonModule,
ClientsModule,
LoggerModule,
Expand All @@ -34,6 +67,18 @@ ApiTags("Infrastructure")(PromController);
HealthCheckModule,
// Extra modules
PgModule,
KafkaModule.registerAsync({
useFactory: () => ({
connections: [
{
name: "test",
cluster: { brokers: ["localhost:9092"] },
consumer: { groupId: "consumer-group-id-test" },
},
],
topicPickerArgs: [{ topic: "users-test-topic" }],
}),
}),
// ExceptionsModule must be registered after all modules with exception filters
ExceptionsModule,
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IncomingMessage } from "http";
import * as process from "process";

import { Span } from "@opentelemetry/api";
import { AsyncLocalStorageContextManager } from "@opentelemetry/context-async-hooks";
import {
CompositePropagator,
Expand All @@ -12,6 +13,8 @@ import {
ExpressLayerType,
} from "@opentelemetry/instrumentation-express";
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { KafkaJsInstrumentation } from "@opentelemetry/instrumentation-kafkajs";
import { MessageInfo } from "@opentelemetry/instrumentation-kafkajs/build/src/types";
import { PgInstrumentation } from "@opentelemetry/instrumentation-pg";
import { PinoInstrumentation } from "@opentelemetry/instrumentation-pino";
import { JaegerPropagator } from "@opentelemetry/propagator-jaeger";
Expand Down Expand Up @@ -45,6 +48,13 @@ export const openTelemetrySetup = (serviceName: string): void => {
}),
new PinoInstrumentation(),
new PgInstrumentation(),
// не заработало. возможно, из-за зависимости от open-tracing в nest-kafka
new KafkaJsInstrumentation({
consumerHook(span: Span, info: MessageInfo) {
span.setAttribute("topic", info.topic);
span.setAttribute("partition", info.message.partition ?? "");
},
}),
],
});

Expand Down
14 changes: 14 additions & 0 deletions packages/app/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import fs from "fs/promises";
import path from "path";
import process from "process";

import { KafkaConsumer, KafkaRetryConsumer } from "@byndyusoft/nest-kafka";
import { Logger, LoggerErrorInterceptor } from "@byndyusoft/nest-pino";
import { DocumentBuilder, SwaggerModule } from "@byndyusoft/nest-swagger";
import { ValidationPipe, VersioningType } from "@nestjs/common";
import { NestFactory } from "@nestjs/core";
import { MicroserviceOptions } from "@nestjs/microservices";
import { NestExpressApplication } from "@nestjs/platform-express";
import helmet from "helmet";

Expand Down Expand Up @@ -58,6 +60,16 @@ function setupSwagger(app: NestExpressApplication): void {
);
}

function setupKafkaConsumer(app: NestExpressApplication): void {
app.connectMicroservice<MicroserviceOptions>({
strategy: app.get(KafkaConsumer),
});

app.connectMicroservice<MicroserviceOptions>({
strategy: app.get(KafkaRetryConsumer),
});
}

async function bootstrap(): Promise<void> {
// Start SDK before nestjs factory create
const packageJsonPath = path.join(process.cwd(), "package.json");
Expand All @@ -79,9 +91,11 @@ async function bootstrap(): Promise<void> {

setupApp(app);
setupSwagger(app);
setupKafkaConsumer(app);

const config = app.get(ConfigDto);
await app.listen(config.http.port, config.http.host);
await app.startAllMicroservices();

logger.log(
"Nest application listening on %s",
Expand Down
30 changes: 30 additions & 0 deletions packages/app/src/users/usersConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
KafkaConsumerEventPattern,
KafkaConsumerPayloadDecoder,
KafkaValue,
} from "@byndyusoft/nest-kafka";
import { HttpService } from "@nestjs/axios";
import { Controller, UseInterceptors } from "@nestjs/common";
import { UserDto } from "open-telemetry-example-dtos";

@Controller()
export class UsersConsumer {
public constructor(private readonly httpService: HttpService) {}

@KafkaConsumerEventPattern({
connectionName: "test",
topicPicker: (config: { topic: string }) => config.topic,
fromBeginning: true,
})
@UseInterceptors(
new KafkaConsumerPayloadDecoder({
value: "json",
}),
)
public async onMessage(@KafkaValue() value: UserDto): Promise<void> {
console.log(value);
await this.httpService.axiosRef.get("/api/v1/users/1", {
baseURL: "http://localhost:8088",
});
}
}
6 changes: 4 additions & 2 deletions packages/app/src/users/usersModule.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { HttpModule } from "@nestjs/axios";
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import * as entities from "open-telemetry-example-entities";

import * as dataAccess from "./dataAccess";
import * as mappers from "./mappers";
import * as useCases from "./useCases";
import { UsersConsumer } from "./usersConsumer";
import { UsersController } from "./usersController";
import { UsersService } from "./usersService";

@Module({
imports: [TypeOrmModule.forFeature(Object.values(entities))],
controllers: [UsersController],
imports: [TypeOrmModule.forFeature(Object.values(entities)), HttpModule],
controllers: [UsersController, UsersConsumer],
providers: [
UsersService,
...Object.values(dataAccess),
Expand Down
Loading

0 comments on commit e5c4749

Please sign in to comment.