Skip to content

Commit

Permalink
Merge pull request #12954 from gunb0s/feature/nestjs-kafka-emit-batch
Browse files Browse the repository at this point in the history
feat: emit batch
  • Loading branch information
kamilmysliwiec authored Nov 20, 2024
2 parents 99b5a5b + 0f00a34 commit 6094701
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 11 deletions.
73 changes: 62 additions & 11 deletions packages/microservices/client/client-kafka.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { isUndefined } from '@nestjs/common/utils/shared.utils';
import { isNil, isUndefined } from '@nestjs/common/utils/shared.utils';
import {
KAFKA_DEFAULT_BROKER,
KAFKA_DEFAULT_CLIENT,
Expand Down Expand Up @@ -37,6 +37,15 @@ import {
KafkaRequestSerializer,
} from '../serializers/kafka-request.serializer';
import { ClientProxy } from './client-proxy';
import {
connectable,
defer,
Observable,
Subject,
throwError as _throw,
} from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { InvalidMessageException } from '../errors/invalid-message.exception';

let kafkaPackage: any = {};

Expand Down Expand Up @@ -223,6 +232,58 @@ export class ClientKafka extends ClientProxy {
return this.consumerAssignments;
}

public emitBatch<TResult = any, TInput = any>(
pattern: any,
data: { messages: TInput[] },
): Observable<TResult> {
if (isNil(pattern) || isNil(data)) {
return _throw(() => new InvalidMessageException());
}
const source = defer(async () => this.connect()).pipe(
mergeMap(() => this.dispatchBatchEvent({ pattern, data })),
);
const connectableSource = connectable(source, {
connector: () => new Subject(),
resetOnDisconnect: false,
});
connectableSource.connect();
return connectableSource;
}

public commitOffsets(
topicPartitions: TopicPartitionOffsetAndMetadata[],
): Promise<void> {
if (this.consumer) {
return this.consumer.commitOffsets(topicPartitions);
} else {
throw new Error('No consumer initialized');
}
}

protected async dispatchBatchEvent<TInput = any>(
packets: ReadPacket<{ messages: TInput[] }>,
): Promise<any> {
if (packets.data.messages.length === 0) {
return;
}
const pattern = this.normalizePattern(packets.pattern);
const outgoingEvents = await Promise.all(
packets.data.messages.map(message => {
return this.serializer.serialize(message as any, { pattern });
}),
);

const message = Object.assign(
{
topic: pattern,
messages: outgoingEvents,
},
this.options.send || {},
);

return this.producer.send(message);
}

protected async dispatchEvent(packet: OutgoingEvent): Promise<any> {
const pattern = this.normalizePattern(packet.pattern);
const outgoingEvent = await this.serializer.serialize(packet.data, {
Expand Down Expand Up @@ -320,14 +381,4 @@ export class ClientKafka extends ClientProxy {
this.deserializer =
(options && options.deserializer) || new KafkaResponseDeserializer();
}

public commitOffsets(
topicPartitions: TopicPartitionOffsetAndMetadata[],
): Promise<void> {
if (this.consumer) {
return this.consumer.commitOffsets(topicPartitions);
} else {
throw new Error('No consumer initialized');
}
}
}
73 changes: 73 additions & 0 deletions packages/microservices/test/client/client-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
EachMessagePayload,
KafkaMessage,
} from '../../external/kafka.interface';
import { Observable } from 'rxjs';
import { Producer } from 'kafkajs';

describe('ClientKafka', () => {
const topic = 'test.topic';
Expand Down Expand Up @@ -549,6 +551,77 @@ describe('ClientKafka', () => {
});
});

describe('emitBatch', () => {
it(`should return an observable stream`, () => {
const stream$ = client.emitBatch(
{},
{
messages: [],
},
);
expect(stream$ instanceof Observable).to.be.true;
});

it(`should call "connect" immediately`, () => {
const connectSpy = sinon.spy(client, 'connect');
client.emitBatch(
{},
{
messages: [],
},
);
expect(connectSpy.calledOnce).to.be.true;
});

describe('when "connect" throws', () => {
it('should return Observable with error', () => {
sinon.stub(client, 'connect').callsFake(() => {
throw new Error();
});

const stream$ = client.emitBatch(
{},
{
messages: [],
},
);

stream$.subscribe({
next: () => {},
error: err => {
expect(err).to.be.instanceof(Error);
},
});
});
});

describe('when is connected', () => {
beforeEach(() => {
sinon
.stub(client, 'connect')
.callsFake(() => Promise.resolve({} as Producer));
});

it(`should call dispatchBatchEvent`, () => {
const pattern = { test: 3 };
const data = { messages: [] };
const dispatchBatchEventSpy = sinon
.stub()
.callsFake(() => Promise.resolve(true));
const stream$ = client.emitBatch(pattern, data);
client['dispatchBatchEvent'] = dispatchBatchEventSpy;
stream$.subscribe(() => {
expect(dispatchBatchEventSpy.calledOnce).to.be.true;
});
});
});

it('should return Observable with error', () => {
const err$ = client.emitBatch(null, null);
expect(err$).to.be.instanceOf(Observable);
});
});

describe('dispatchEvent', () => {
const eventMessage = {
id: undefined,
Expand Down

0 comments on commit 6094701

Please sign in to comment.