From 1285e4bd41ca21cffcf2ca42e060170f4014ec74 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 10 Sep 2024 12:39:27 +0300 Subject: [PATCH] feat(node): Add `kafkajs` integration (#13528) --- .../node-integration-tests/package.json | 1 + .../suites/tracing/kafkajs/docker-compose.yml | 7 ++ .../suites/tracing/kafkajs/scenario.js | 63 +++++++++++++++ .../suites/tracing/kafkajs/test.ts | 55 +++++++++++++ packages/astro/src/index.server.ts | 1 + packages/aws-serverless/src/index.ts | 1 + packages/bun/src/index.ts | 1 + packages/google-cloud-serverless/src/index.ts | 1 + packages/node/package.json | 1 + packages/node/src/index.ts | 1 + .../node/src/integrations/tracing/index.ts | 3 + .../node/src/integrations/tracing/kafka.ts | 37 +++++++++ packages/remix/src/index.server.ts | 1 + packages/solidstart/src/server/index.ts | 1 + packages/sveltekit/src/server/index.ts | 1 + yarn.lock | 77 +++++-------------- 16 files changed, 196 insertions(+), 56 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/kafkajs/docker-compose.yml create mode 100644 dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.js create mode 100644 dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts create mode 100644 packages/node/src/integrations/tracing/kafka.ts diff --git a/dev-packages/node-integration-tests/package.json b/dev-packages/node-integration-tests/package.json index 486b93bba24f..65c204a30dd8 100644 --- a/dev-packages/node-integration-tests/package.json +++ b/dev-packages/node-integration-tests/package.json @@ -48,6 +48,7 @@ "graphql": "^16.3.0", "http-terminator": "^3.2.0", "ioredis": "^5.4.1", + "kafkajs": "2.2.4", "mongodb": "^3.7.3", "mongodb-memory-server-global": "^7.6.3", "mongoose": "^5.13.22", diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/docker-compose.yml b/dev-packages/node-integration-tests/suites/tracing/kafkajs/docker-compose.yml new file mode 100644 index 000000000000..f744bfe6d50c --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/docker-compose.yml @@ -0,0 +1,7 @@ +services: + db: + image: apache/kafka:latest + restart: always + container_name: integration-tests-kafka + ports: + - '9092:9092' diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.js b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.js new file mode 100644 index 000000000000..d4541aa3a7de --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.js @@ -0,0 +1,63 @@ +const { loggingTransport } = require('@sentry-internal/node-integration-tests'); +const Sentry = require('@sentry/node'); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + transport: loggingTransport, +}); + +// Stop the process from exiting before the transaction is sent +setInterval(() => {}, 1000); + +const { Kafka } = require('kafkajs'); + +async function run() { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'], + }); + + const admin = kafka.admin(); + await admin.connect(); + + const producer = kafka.producer(); + await producer.connect(); + + await admin.createTopics({ + topics: [{ topic: 'test-topic' }], + }); + + const consumer = kafka.consumer({ + groupId: 'test-group', + }); + + await consumer.connect(); + await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); + + consumer.run({ + eachMessage: async ({ message }) => { + // eslint-disable-next-line no-console + console.debug('Received message', message.value.toString()); + }, + }); + + // Wait for the consumer to be ready + await new Promise(resolve => setTimeout(resolve, 4000)); + + await producer.send({ + topic: 'test-topic', + messages: [ + { + value: 'TEST_MESSAGE', + }, + ], + }); + + // Wait for the message to be received + await new Promise(resolve => setTimeout(resolve, 5000)); +} + +// eslint-disable-next-line @typescript-eslint/no-floating-promises +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts b/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts new file mode 100644 index 000000000000..f818af1d676a --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts @@ -0,0 +1,55 @@ +import { cleanupChildProcesses, createRunner } from '../../../utils/runner'; + +// When running docker compose, we need a larger timeout, as this takes some time... +jest.setTimeout(60_000); + +describe('kafkajs', () => { + afterAll(() => { + cleanupChildProcesses(); + }); + + test('traces producers and consumers', done => { + createRunner(__dirname, 'scenario.js') + .withDockerCompose({ + workingDirectory: [__dirname], + readyMatches: ['9092'], + }) + .expect({ + transaction: { + transaction: 'test-topic', + contexts: { + trace: expect.objectContaining({ + op: 'message', + status: 'ok', + data: expect.objectContaining({ + 'messaging.system': 'kafka', + 'messaging.destination': 'test-topic', + 'otel.kind': 'PRODUCER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.kafkajs.otel.producer', + }), + }), + }, + }, + }) + .expect({ + transaction: { + transaction: 'test-topic', + contexts: { + trace: expect.objectContaining({ + op: 'message', + status: 'ok', + data: expect.objectContaining({ + 'messaging.system': 'kafka', + 'messaging.destination': 'test-topic', + 'otel.kind': 'CONSUMER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.kafkajs.otel.consumer', + }), + }), + }, + }, + }) + .start(done); + }); +}); diff --git a/packages/astro/src/index.server.ts b/packages/astro/src/index.server.ts index 747870da3014..2645151a9ede 100644 --- a/packages/astro/src/index.server.ts +++ b/packages/astro/src/index.server.ts @@ -65,6 +65,7 @@ export { inboundFiltersIntegration, initOpenTelemetry, isInitialized, + kafkaIntegration, koaIntegration, lastEventId, linkedErrorsIntegration, diff --git a/packages/aws-serverless/src/index.ts b/packages/aws-serverless/src/index.ts index f648dba045ec..19c90e3aef3f 100644 --- a/packages/aws-serverless/src/index.ts +++ b/packages/aws-serverless/src/index.ts @@ -89,6 +89,7 @@ export { fsIntegration, genericPoolIntegration, graphqlIntegration, + kafkaIntegration, mongoIntegration, mongooseIntegration, mysqlIntegration, diff --git a/packages/bun/src/index.ts b/packages/bun/src/index.ts index 1d8b02c33568..fcb3d1331f46 100644 --- a/packages/bun/src/index.ts +++ b/packages/bun/src/index.ts @@ -110,6 +110,7 @@ export { setupConnectErrorHandler, genericPoolIntegration, graphqlIntegration, + kafkaIntegration, mongoIntegration, mongooseIntegration, mysqlIntegration, diff --git a/packages/google-cloud-serverless/src/index.ts b/packages/google-cloud-serverless/src/index.ts index 463a0c5c1246..14aa0996cb7c 100644 --- a/packages/google-cloud-serverless/src/index.ts +++ b/packages/google-cloud-serverless/src/index.ts @@ -89,6 +89,7 @@ export { fastifyIntegration, genericPoolIntegration, graphqlIntegration, + kafkaIntegration, mongoIntegration, mongooseIntegration, mysqlIntegration, diff --git a/packages/node/package.json b/packages/node/package.json index 63c6f719a2d4..9deef7bc4fe3 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -78,6 +78,7 @@ "@opentelemetry/instrumentation-hapi": "0.41.0", "@opentelemetry/instrumentation-http": "0.53.0", "@opentelemetry/instrumentation-ioredis": "0.43.0", + "@opentelemetry/instrumentation-kafkajs": "0.3.0", "@opentelemetry/instrumentation-koa": "0.43.0", "@opentelemetry/instrumentation-mongodb": "0.47.0", "@opentelemetry/instrumentation-mongoose": "0.42.0", diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 6ce3c325e3ff..d4cbcb9544a9 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -14,6 +14,7 @@ export { anrIntegration } from './integrations/anr'; export { expressIntegration, expressErrorHandler, setupExpressErrorHandler } from './integrations/tracing/express'; export { fastifyIntegration, setupFastifyErrorHandler } from './integrations/tracing/fastify'; export { graphqlIntegration } from './integrations/tracing/graphql'; +export { kafkaIntegration } from './integrations/tracing/kafka'; export { mongoIntegration } from './integrations/tracing/mongo'; export { mongooseIntegration } from './integrations/tracing/mongoose'; export { mysqlIntegration } from './integrations/tracing/mysql'; diff --git a/packages/node/src/integrations/tracing/index.ts b/packages/node/src/integrations/tracing/index.ts index 46a9f79e4caa..69ffc24a8be2 100644 --- a/packages/node/src/integrations/tracing/index.ts +++ b/packages/node/src/integrations/tracing/index.ts @@ -7,6 +7,7 @@ import { fastifyIntegration, instrumentFastify } from './fastify'; import { genericPoolIntegration, instrumentGenericPool } from './genericPool'; import { graphqlIntegration, instrumentGraphql } from './graphql'; import { hapiIntegration, instrumentHapi } from './hapi'; +import { instrumentKafka, kafkaIntegration } from './kafka'; import { instrumentKoa, koaIntegration } from './koa'; import { instrumentMongo, mongoIntegration } from './mongo'; import { instrumentMongoose, mongooseIntegration } from './mongoose'; @@ -39,6 +40,7 @@ export function getAutoPerformanceIntegrations(): Integration[] { koaIntegration(), connectIntegration(), genericPoolIntegration(), + kafkaIntegration(), ]; } @@ -53,6 +55,7 @@ export function getOpenTelemetryInstrumentationToPreload(): (((options?: any) => instrumentConnect, instrumentFastify, instrumentHapi, + instrumentKafka, instrumentKoa, instrumentNest, instrumentMongo, diff --git a/packages/node/src/integrations/tracing/kafka.ts b/packages/node/src/integrations/tracing/kafka.ts new file mode 100644 index 000000000000..7bdab00459e1 --- /dev/null +++ b/packages/node/src/integrations/tracing/kafka.ts @@ -0,0 +1,37 @@ +import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs'; + +import { defineIntegration } from '@sentry/core'; +import type { IntegrationFn } from '@sentry/types'; +import { generateInstrumentOnce } from '../../otel/instrument'; +import { addOriginToSpan } from '../../utils/addOriginToSpan'; + +const INTEGRATION_NAME = 'Kafka'; + +export const instrumentKafka = generateInstrumentOnce( + INTEGRATION_NAME, + () => + new KafkaJsInstrumentation({ + consumerHook(span) { + addOriginToSpan(span, 'auto.kafkajs.otel.consumer'); + }, + producerHook(span) { + addOriginToSpan(span, 'auto.kafkajs.otel.producer'); + }, + }), +); + +const _kafkaIntegration = (() => { + return { + name: INTEGRATION_NAME, + setupOnce() { + instrumentKafka(); + }, + }; +}) satisfies IntegrationFn; + +/** + * KafkaJs integration + * + * Capture tracing data for KafkaJs. + */ +export const kafkaIntegration = defineIntegration(_kafkaIntegration); diff --git a/packages/remix/src/index.server.ts b/packages/remix/src/index.server.ts index 457dcb9f8685..37161b41715e 100644 --- a/packages/remix/src/index.server.ts +++ b/packages/remix/src/index.server.ts @@ -67,6 +67,7 @@ export { inboundFiltersIntegration, initOpenTelemetry, isInitialized, + kafkaIntegration, koaIntegration, lastEventId, linkedErrorsIntegration, diff --git a/packages/solidstart/src/server/index.ts b/packages/solidstart/src/server/index.ts index 995f58d057e3..794106eca715 100644 --- a/packages/solidstart/src/server/index.ts +++ b/packages/solidstart/src/server/index.ts @@ -58,6 +58,7 @@ export { inboundFiltersIntegration, initOpenTelemetry, isInitialized, + kafkaIntegration, koaIntegration, lastEventId, linkedErrorsIntegration, diff --git a/packages/sveltekit/src/server/index.ts b/packages/sveltekit/src/server/index.ts index d57ec35bd7cc..e2902afb400b 100644 --- a/packages/sveltekit/src/server/index.ts +++ b/packages/sveltekit/src/server/index.ts @@ -60,6 +60,7 @@ export { inboundFiltersIntegration, initOpenTelemetry, isInitialized, + kafkaIntegration, koaIntegration, lastEventId, linkedErrorsIntegration, diff --git a/yarn.lock b/yarn.lock index 22770c583774..a9e1df0710b6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7169,6 +7169,14 @@ "@opentelemetry/redis-common" "^0.36.2" "@opentelemetry/semantic-conventions" "^1.27.0" +"@opentelemetry/instrumentation-kafkajs@0.3.0": + version "0.3.0" + resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-kafkajs/-/instrumentation-kafkajs-0.3.0.tgz#6687bce4dac8b90ef8ccbf1b662d5d1e95a34414" + integrity sha512-UnkZueYK1ise8FXQeKlpBd7YYUtC7mM8J0wzUSccEfc/G8UqHQqAzIyYCUOUPUKp8GsjLnWOOK/3hJc4owb7Jg== + dependencies: + "@opentelemetry/instrumentation" "^0.53.0" + "@opentelemetry/semantic-conventions" "^1.27.0" + "@opentelemetry/instrumentation-koa@0.43.0": version "0.43.0" resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-koa/-/instrumentation-koa-0.43.0.tgz#963fd192a1b5f6cbae5dabf4ec82e3105cbb23b1" @@ -9685,17 +9693,8 @@ dependencies: "@types/unist" "*" -"@types/history-4@npm:@types/history@4.7.8": - version "4.7.8" - resolved "https://registry.yarnpkg.com/@types/history/-/history-4.7.8.tgz#49348387983075705fe8f4e02fb67f7daaec4934" - integrity sha512-S78QIYirQcUoo6UJZx9CSP0O2ix9IaeAXwQi26Rhr/+mg7qqPy8TzaxHSUut7eGjL8WmLccT7/MXf304WjqHcA== - -"@types/history-5@npm:@types/history@4.7.8": - version "4.7.8" - resolved "https://registry.yarnpkg.com/@types/history/-/history-4.7.8.tgz#49348387983075705fe8f4e02fb67f7daaec4934" - integrity sha512-S78QIYirQcUoo6UJZx9CSP0O2ix9IaeAXwQi26Rhr/+mg7qqPy8TzaxHSUut7eGjL8WmLccT7/MXf304WjqHcA== - -"@types/history@*": +"@types/history-4@npm:@types/history@4.7.8", "@types/history-5@npm:@types/history@4.7.8", "@types/history@*": + name "@types/history-4" version "4.7.8" resolved "https://registry.yarnpkg.com/@types/history/-/history-4.7.8.tgz#49348387983075705fe8f4e02fb67f7daaec4934" integrity sha512-S78QIYirQcUoo6UJZx9CSP0O2ix9IaeAXwQi26Rhr/+mg7qqPy8TzaxHSUut7eGjL8WmLccT7/MXf304WjqHcA== @@ -10023,15 +10022,7 @@ "@types/history" "^3" "@types/react" "*" -"@types/react-router-4@npm:@types/react-router@5.1.14": - version "5.1.14" - resolved "https://registry.yarnpkg.com/@types/react-router/-/react-router-5.1.14.tgz#e0442f4eb4c446541ad7435d44a97f8fe6df40da" - integrity sha512-LAJpqYUaCTMT2anZheoidiIymt8MuX286zoVFPM3DVb23aQBH0mAkFvzpd4LKqiolV8bBtZWT5Qp7hClCNDENw== - dependencies: - "@types/history" "*" - "@types/react" "*" - -"@types/react-router-5@npm:@types/react-router@5.1.14": +"@types/react-router-4@npm:@types/react-router@5.1.14", "@types/react-router-5@npm:@types/react-router@5.1.14": version "5.1.14" resolved "https://registry.yarnpkg.com/@types/react-router/-/react-router-5.1.14.tgz#e0442f4eb4c446541ad7435d44a97f8fe6df40da" integrity sha512-LAJpqYUaCTMT2anZheoidiIymt8MuX286zoVFPM3DVb23aQBH0mAkFvzpd4LKqiolV8bBtZWT5Qp7hClCNDENw== @@ -22431,6 +22422,11 @@ jws@^4.0.0: jwa "^2.0.0" safe-buffer "^5.0.1" +kafkajs@2.2.4: + version "2.2.4" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.2.4.tgz#59e6e16459d87fdf8b64be73970ed5aa42370a5b" + integrity sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA== + kareem@2.3.2: version "2.3.2" resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.3.2.tgz#78c4508894985b8d38a0dc15e1a8e11078f2ca93" @@ -28434,7 +28430,8 @@ react-is@^18.0.0: dependencies: "@remix-run/router" "1.0.2" -"react-router-6@npm:react-router@6.3.0": +"react-router-6@npm:react-router@6.3.0", react-router@6.3.0: + name react-router-6 version "6.3.0" resolved "https://registry.yarnpkg.com/react-router/-/react-router-6.3.0.tgz#3970cc64b4cb4eae0c1ea5203a80334fdd175557" integrity sha512-7Wh1DzVQ+tlFjkeo+ujvjSqSJmkt1+8JO+T5xklPlgrh70y7ogx75ODRW0ThWhY7S+6yEDks8TYrtQe/aoboBQ== @@ -28449,13 +28446,6 @@ react-router-dom@^6.2.2: history "^5.2.0" react-router "6.3.0" -react-router@6.3.0: - version "6.3.0" - resolved "https://registry.yarnpkg.com/react-router/-/react-router-6.3.0.tgz#3970cc64b4cb4eae0c1ea5203a80334fdd175557" - integrity sha512-7Wh1DzVQ+tlFjkeo+ujvjSqSJmkt1+8JO+T5xklPlgrh70y7ogx75ODRW0ThWhY7S+6yEDks8TYrtQe/aoboBQ== - dependencies: - history "^5.2.0" - react@^18.0.0: version "18.0.0" resolved "https://registry.yarnpkg.com/react/-/react-18.0.0.tgz#b468736d1f4a5891f38585ba8e8fb29f91c3cb96" @@ -30934,16 +30924,7 @@ string-template@~0.2.1: resolved "https://registry.yarnpkg.com/string-template/-/string-template-0.2.1.tgz#42932e598a352d01fc22ec3367d9d84eec6c9add" integrity sha1-QpMuWYo1LQH8IuwzZ9nYTuxsmt0= -"string-width-cjs@npm:string-width@^4.2.0": - version "4.2.3" - resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" - integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== - dependencies: - emoji-regex "^8.0.0" - is-fullwidth-code-point "^3.0.0" - strip-ansi "^6.0.1" - -string-width@4.2.3, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0", string-width@4.2.3, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3: version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -31055,14 +31036,7 @@ stringify-object@^3.2.1: is-obj "^1.0.1" is-regexp "^1.0.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1": - version "6.0.1" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" - integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== - dependencies: - ansi-regex "^5.0.1" - -strip-ansi@6.0.1, strip-ansi@^6.0.0, strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@6.0.1, strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -34031,16 +34005,7 @@ wrangler@^3.67.1: optionalDependencies: fsevents "~2.3.2" -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": - version "7.0.0" - resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" - integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== - dependencies: - ansi-styles "^4.0.0" - string-width "^4.1.0" - strip-ansi "^6.0.0" - -wrap-ansi@7.0.0, wrap-ansi@^7.0.0: +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@7.0.0, wrap-ansi@^7.0.0: version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==