From 30a6b52c501e0621c620ceeb45a2c4b851330033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Mon, 18 Nov 2024 15:23:44 +0100 Subject: [PATCH 1/3] fix(microservices): grpc client streaming bugs #14094 #13818 --- packages/microservices/server/server-grpc.ts | 84 +++++++++++++++++--- 1 file changed, 72 insertions(+), 12 deletions(-) diff --git a/packages/microservices/server/server-grpc.ts b/packages/microservices/server/server-grpc.ts index f27e0e7217e..817a1d192d9 100644 --- a/packages/microservices/server/server-grpc.ts +++ b/packages/microservices/server/server-grpc.ts @@ -6,6 +6,7 @@ import { import { EMPTY, Observable, + ReplaySubject, Subject, Subscription, defaultIfEmpty, @@ -165,7 +166,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { if (!methodHandler) { continue; } - service[methodName] = await this.createServiceMethod( + service[methodName] = this.createServiceMethod( methodHandler, grpcService.prototype[methodName], streamingType, @@ -174,7 +175,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { return service; } - getMessageHandler( + public getMessageHandler( serviceName: string, methodName: string, streaming: GrpcMethodStreamingType, @@ -278,7 +279,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { source: Observable, call: GrpcCall, ): Promise { - // this promise should **not** reject, as we're handling errors in the observable for the Call + // This promise should **not** reject, as we're handling errors in the observable for the Call // the promise is only needed to signal when writing/draining has been completed return new Promise((resolve, _doNotUse) => { const valuesWaitingToBeDrained: T[] = []; @@ -380,8 +381,12 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { call: GrpcCall, callback: (err: unknown, value: unknown) => void, ) => { - const req = new Subject(); - call.on('data', (m: any) => req.next(m)); + // Needs to be a ReplaySubject in order to buffer messages that come before handler is executed + // This could happen if handler has any async guards or interceptors registered that would delay + // the execution. + const { subject, next, error, complete } = + this.bufferUntilFirstSubscription(); + call.on('data', (m: any) => next(m)); call.on('error', (e: any) => { // Check if error means that stream ended on other end const isCancelledError = String(e).toLowerCase().indexOf('cancelled'); @@ -391,18 +396,22 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { return; } // If another error then just pass it along - req.error(e); + error(e); }); - call.on('end', () => req.complete()); + call.on('end', () => complete()); - const handler = methodHandler(req.asObservable(), call.metadata, call); + const handler = methodHandler( + subject.asObservable(), + call.metadata, + call, + ); const res = this.transformToObservable(await handler); if (isResponseStream) { await this.writeObservableToGrpc(res, call); } else { const response = await lastValueFrom( res.pipe( - takeUntil(fromEvent(call as any, CANCEL_EVENT)), + takeUntil(fromEvent(call as any, CANCELLED_EVENT)), catchError(err => { callback(err, null); return EMPTY; @@ -426,11 +435,15 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { call: GrpcCall, callback: (err: unknown, value: unknown) => void, ) => { + let handlerStream: Observable; if (isResponseStream) { - methodHandler(call); + handlerStream = this.transformToObservable(await methodHandler(call)); } else { - methodHandler(call, callback); + handlerStream = this.transformToObservable( + await methodHandler(call, callback), + ); } + await lastValueFrom(handlerStream); }; } @@ -469,7 +482,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { this.messageHandlers.set(route, callback); } - public async createClient(): Promise { + public async createClient() { const channelOptions: ChannelOptions = this.options && this.options.channelOptions ? this.options.channelOptions @@ -612,4 +625,51 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { ); } } + + private bufferUntilFirstSubscription() { + const subject = new Subject(); + const replayBuffer = new ReplaySubject(); + let hasSubscribed = false; + + return { + subject: new Proxy(subject, { + get(target, prop, receiver) { + if (prop === 'subscribe' && !hasSubscribed) { + hasSubscribed = true; + + // Replay buffered values to the new subscriber + // Schedule this operation in the next tick to let the subscriber + // to be registered before emitting values + process.nextTick(() => { + replayBuffer.subscribe(target); + replayBuffer.complete(); + }); + } + return Reflect.get(target, prop, receiver); + }, + }), + next: (value: T) => { + if (!hasSubscribed) { + replayBuffer.next(value); + } + subject.next(value); + }, + error: (err: any) => { + if (!hasSubscribed) { + replayBuffer.error(err); + } + subject.error(err); + }, + complete: () => { + if (!hasSubscribed) { + replayBuffer.complete(); + // Replay buffer is no longer needed + // Return early to allow subject to complete later, after the replay buffer + // has been drained + return; + } + subject.complete(); + }, + }; + } } From 45645039362e381f8cea8fa6a75130bb75c37c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Mon, 18 Nov 2024 15:27:45 +0100 Subject: [PATCH 2/3] fix: use valid reference for the cancel event constant --- packages/microservices/server/server-grpc.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/microservices/server/server-grpc.ts b/packages/microservices/server/server-grpc.ts index 817a1d192d9..4a32983e490 100644 --- a/packages/microservices/server/server-grpc.ts +++ b/packages/microservices/server/server-grpc.ts @@ -411,7 +411,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { } else { const response = await lastValueFrom( res.pipe( - takeUntil(fromEvent(call as any, CANCELLED_EVENT)), + takeUntil(fromEvent(call as any, CANCEL_EVENT)), catchError(err => { callback(err, null); return EMPTY; From b65c41c5a8927593ec834887e83ab95018d9ff28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Wed, 20 Nov 2024 10:21:10 +0100 Subject: [PATCH 3/3] fix: use proxy, drain buffer before execution --- .../decorators/message-pattern.decorator.ts | 42 ++++++++++++-- packages/microservices/server/server-grpc.ts | 58 ++++++++++++------- 2 files changed, 74 insertions(+), 26 deletions(-) diff --git a/packages/microservices/decorators/message-pattern.decorator.ts b/packages/microservices/decorators/message-pattern.decorator.ts index a9f9a748cc5..c20a2377c0f 100644 --- a/packages/microservices/decorators/message-pattern.decorator.ts +++ b/packages/microservices/decorators/message-pattern.decorator.ts @@ -1,23 +1,23 @@ import { - isObject, - isNumber, isNil, + isNumber, + isObject, isSymbol, } from '@nestjs/common/utils/shared.utils'; /* eslint-disable @typescript-eslint/no-use-before-define */ import { + PATTERN_EXTRAS_METADATA, PATTERN_HANDLER_METADATA, PATTERN_METADATA, TRANSPORT_METADATA, - PATTERN_EXTRAS_METADATA, } from '../constants'; -import { PatternHandler } from '../enums/pattern-handler.enum'; -import { PatternMetadata } from '../interfaces/pattern-metadata.interface'; import { Transport } from '../enums'; +import { PatternHandler } from '../enums/pattern-handler.enum'; import { InvalidGrpcDecoratorException, RpcDecoratorMetadata, } from '../errors/invalid-grpc-message-decorator.exception'; +import { PatternMetadata } from '../interfaces/pattern-metadata.interface'; export enum GrpcMethodStreamingType { NO_STREAMING = 'no_stream', @@ -141,7 +141,37 @@ export function GrpcStreamMethod( method, GrpcMethodStreamingType.RX_STREAMING, ); - return MessagePattern(metadata, Transport.GRPC)(target, key, descriptor); + + MessagePattern(metadata, Transport.GRPC)(target, key, descriptor); + + const originalMethod = descriptor.value; + + // Override original method to call the "drainBuffer" method on the first parameter + // This is required to avoid premature message emission + descriptor.value = async function ( + this: any, + observable: any, + ...args: any[] + ) { + const result = await Promise.resolve( + originalMethod.apply(this, [observable, ...args]), + ); + + // Drain buffer if "drainBuffer" method is available + if (observable && observable.drainBuffer) { + process.nextTick(() => { + observable.drainBuffer(); + }); + } + return result; + }; + + // Copy all metadata from the original method to the new one + const metadataKeys = Reflect.getMetadataKeys(originalMethod); + metadataKeys.forEach(metadataKey => { + const metadataValue = Reflect.getMetadata(metadataKey, originalMethod); + Reflect.defineMetadata(metadataKey, metadataValue, descriptor.value); + }); }; } diff --git a/packages/microservices/server/server-grpc.ts b/packages/microservices/server/server-grpc.ts index 4a32983e490..4cfb5399398 100644 --- a/packages/microservices/server/server-grpc.ts +++ b/packages/microservices/server/server-grpc.ts @@ -381,11 +381,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { call: GrpcCall, callback: (err: unknown, value: unknown) => void, ) => { - // Needs to be a ReplaySubject in order to buffer messages that come before handler is executed + // Needs to be a Proxy in order to buffer messages that come before handler is executed // This could happen if handler has any async guards or interceptors registered that would delay // the execution. - const { subject, next, error, complete } = - this.bufferUntilFirstSubscription(); + const { subject, next, error, complete } = this.bufferUntilDrained(); call.on('data', (m: any) => next(m)); call.on('error', (e: any) => { // Check if error means that stream ended on other end @@ -626,42 +625,61 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { } } - private bufferUntilFirstSubscription() { + private bufferUntilDrained() { + type DrainableSubject = Subject & { drainBuffer: () => void }; + const subject = new Subject(); const replayBuffer = new ReplaySubject(); - let hasSubscribed = false; + let hasDrained = false; + + function drainBuffer(this: DrainableSubject) { + if (hasDrained) { + return; + } + hasDrained = true; + + // Replay buffered values to the new subscriber + replayBuffer.subscribe({ + next: val => console.log('emitted', val), + }); + replayBuffer.complete(); + } return { - subject: new Proxy(subject, { + subject: new Proxy>(subject as DrainableSubject, { get(target, prop, receiver) { - if (prop === 'subscribe' && !hasSubscribed) { - hasSubscribed = true; - - // Replay buffered values to the new subscriber - // Schedule this operation in the next tick to let the subscriber - // to be registered before emitting values - process.nextTick(() => { - replayBuffer.subscribe(target); - replayBuffer.complete(); - }); + if (prop === 'asObservable') { + return () => { + const stream = subject.asObservable(); + + // "drainBuffer" will be called before the evaluation of the handler + // but after any enhancers have been applied (e.g., `interceptors`) + Object.defineProperty(stream, drainBuffer.name, { + value: drainBuffer, + }); + return stream; + }; + } + if (hasDrained) { + return Reflect.get(target, prop, receiver); } - return Reflect.get(target, prop, receiver); + return Reflect.get(replayBuffer, prop, receiver); }, }), next: (value: T) => { - if (!hasSubscribed) { + if (!hasDrained) { replayBuffer.next(value); } subject.next(value); }, error: (err: any) => { - if (!hasSubscribed) { + if (!hasDrained) { replayBuffer.error(err); } subject.error(err); }, complete: () => { - if (!hasSubscribed) { + if (!hasDrained) { replayBuffer.complete(); // Replay buffer is no longer needed // Return early to allow subject to complete later, after the replay buffer