From d3b297be9b9996c10335ccebd3921b16d3d50d50 Mon Sep 17 00:00:00 2001 From: Stephen Lautier Date: Wed, 23 Nov 2022 13:08:46 +0100 Subject: [PATCH] fix(hub connection): correctly close the websocket connection when `disconnect` is called right after `connect` (#54) ### Features - **hub connection:** ability to configure microsoft `HubConnection` directly via `configureSignalRHubConnection` ### Bug Fixes - **hub connection:** correctly close the websocket connection when `disconnect` is called right after `connect` - **hub connection:** `connectionState$` `connecting` will also be set when connecting not only when reconnecting --- CHANGELOG.md | 11 +++++ package.json | 2 +- src/hub-connection.connection.spec.ts | 31 ++++++++++++- src/hub-connection.model.ts | 6 ++- src/hub-connection.ts | 67 ++++++++++++++------------- 5 files changed, 81 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90f452e..3d22b3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## [3.1.0](https://github.com/sketch7/signalr-client/compare/3.0.0...3.1.0) (2022-11-22) + +### Features + +- **hub connection:** ability to configure microsoft `HubConnection` directly via `configureSignalRHubConnection` + +### Bug Fixes + +- **hub connection:** correctly close the websocket connection when `disconnect` is called right after `connect` +- **hub connection:** `connectionState$` `connecting` will also be set when connecting not only when reconnecting + ## [3.0.0](https://github.com/sketch7/signalr-client/compare/2.0.0...3.0.0) (2021-04-30) ### Features diff --git a/package.json b/package.json index d1d57f1..2bdcb54 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ssv/signalr-client", - "version": "3.0.0", + "version": "3.1.0", "versionSuffix": "", "description": "SignalR client library built on top of @microsoft/signalr. This gives you more features and easier to use.", "homepage": "https://github.com/sketch7/signalr-client", diff --git a/src/hub-connection.connection.spec.ts b/src/hub-connection.connection.spec.ts index 682d9dd..ceccce6 100644 --- a/src/hub-connection.connection.spec.ts +++ b/src/hub-connection.connection.spec.ts @@ -65,13 +65,14 @@ describe("HubConnection Specs", () => { beforeEach(() => { hubBackend.connection.start = jest.fn().mockReturnValue(promiseDelayResolve(5)); + hubBackend.connection.stop = jest.fn().mockReturnValue(promiseDelayResolve(5)); }); describe("and connects successfully", () => { - + // connect -> WHILE CONNECTING -> disconnect it("should have status disconnected", done => { const connect$ = SUT.connect(); const state$ = SUT.connectionState$.pipe( @@ -81,6 +82,7 @@ describe("HubConnection Specs", () => { withLatestFrom(SUT.connectionState$, (_x, y) => y), tap(state => { expect(hubBackend.connection.start).toHaveBeenCalledTimes(1); + expect(hubBackend.connection.stop).toHaveBeenCalledTimes(1); expect(state.status).toBe(ConnectionStatus.disconnected); done(); }) @@ -88,6 +90,31 @@ describe("HubConnection Specs", () => { conn$$ = merge(connect$, state$).subscribe(); }); + describe("and connects with different data", () => { + + + // connect -> WHILE CONNECTING -> disconnect -> connect with different data + it("should have status connected", done => { + const connect$ = SUT.connect(); + const state$ = SUT.connectionState$.pipe( + first(), + switchMap(() => SUT.disconnect()), + delay(2), // ensure start is in flight + switchMap(() => SUT.connect(() => ({ second: "true" }))), + withLatestFrom(SUT.connectionState$, (_x, y) => y), + tap(state => { + expect(hubBackend.connection.start).toHaveBeenCalledTimes(2); + expect(hubBackend.connection.stop).toHaveBeenCalledTimes(1); + expect(state.status).toBe(ConnectionStatus.connected); + done(); + }) + ); + conn$$ = merge(connect$, state$).subscribe(); + }); + + + + }); }); @@ -184,7 +211,7 @@ describe("HubConnection Specs", () => { tap(state => { expect(state.status).toBe(ConnectionStatus.disconnected); expect(hubStartSpy).toBeCalledTimes(1); - expect(hubStopSpy).not.toBeCalled(); + // expect(hubStopSpy).not.toBeCalled(); done(); }), ); diff --git a/src/hub-connection.model.ts b/src/hub-connection.model.ts index 8ad4734..a3ed28b 100644 --- a/src/hub-connection.model.ts +++ b/src/hub-connection.model.ts @@ -1,4 +1,4 @@ -import { IHttpConnectionOptions, IHubProtocol } from "@microsoft/signalr"; +import { HubConnection, IHttpConnectionOptions, IHubProtocol } from "@microsoft/signalr"; import { Dictionary } from "./utils/dictionary"; @@ -38,6 +38,10 @@ export interface HubConnectionOptions { /** @internal */ getData?: () => Dictionary; protocol?: IHubProtocol; + /** + * Configures the SignalR Hub connection after it has been built (raw) in order to access/configure `serverTimeoutInMilliseconds`, `keepAliveIntervalInMilliseconds` etc... + */ + configureSignalRHubConnection?: (hubConnection: HubConnection) => void; } export interface ConnectionOptions extends IHttpConnectionOptions { diff --git a/src/hub-connection.ts b/src/hub-connection.ts index c903e88..c56c472 100644 --- a/src/hub-connection.ts +++ b/src/hub-connection.ts @@ -4,9 +4,9 @@ import { } from "rxjs/operators"; import { HubConnection as SignalRHubConnection, - HubConnectionBuilder as SignalRHubConnectionBuilder + HubConnectionBuilder as SignalRHubConnectionBuilder, } from "@microsoft/signalr"; -import { from as fromPromise, BehaviorSubject, Observable, Observer, timer, throwError, Subscription, merge } from "rxjs"; +import { from, BehaviorSubject, Observable, Observer, timer, throwError, Subject } from "rxjs"; import { ConnectionState, ConnectionStatus, HubConnectionOptions, @@ -20,6 +20,7 @@ import { emptyNext } from "./utils/rxjs"; const errorReasonName = "error"; const disconnectedState = Object.freeze({ status: ConnectionStatus.disconnected }); const connectedState = Object.freeze({ status: ConnectionStatus.connected }); +const connectingState = Object.freeze({ status: ConnectionStatus.connecting }); // todo: rename HubClient? export class HubConnection { @@ -36,7 +37,7 @@ export class HubConnection { private desiredState$ = new BehaviorSubject(DesiredConnectionStatus.disconnected); private internalConnStatus$ = new BehaviorSubject(InternalConnectionStatus.disconnected); private connectionBuilder = new SignalRHubConnectionBuilder(); - private effects$$ = Subscription.EMPTY; + private readonly _destroy$ = new Subject(); private waitUntilConnect$ = this.connectionState$.pipe( distinctUntilChanged((x, y) => x.status === y.status), @@ -62,7 +63,9 @@ export class HubConnection { if (connectionOpts.protocol) { this.connectionBuilder = this.connectionBuilder.withHubProtocol(connectionOpts.protocol); } + this.hubConnection = this.connectionBuilder.build(); + connectionOpts.configureSignalRHubConnection?.(this.hubConnection); this.hubConnection.onclose(err => { this.internalConnStatus$.next(InternalConnectionStatus.disconnected); if (err) { @@ -81,42 +84,40 @@ export class HubConnection { tap(() => this.internalConnStatus$.next(InternalConnectionStatus.ready)), filter(() => prevConnectionStatus === InternalConnectionStatus.connected), switchMap(() => this.openConnection()) - )) + )), + takeUntil(this._destroy$), ); const desiredDisconnected$ = this.desiredState$.pipe( filter(status => status === DesiredConnectionStatus.disconnected), - // tap(status => console.warn(">>>> disconnected$", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })), + // tap(status => console.warn(">>>> [desiredDisconnected$] desired disconnected", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })), tap(() => { - switch (this.internalConnStatus$.value) { - case InternalConnectionStatus.connected: - this._disconnect(); - break; - case InternalConnectionStatus.ready: - this._connectionState$.next(disconnectedState); - break; - // default: - // console.error("desiredDisconnected$ - State unhandled", this.internalConnStatus$.value); - // break; + if (this._connectionState$.value.status !== ConnectionStatus.disconnected) { + // console.warn(">>>> [desiredDisconnected$] _disconnect"); + // note: ideally delayWhen disconnect first, though for some reason obs not bubbling + this._disconnect() } - }) + }), + tap(() => this._connectionState$.next(disconnectedState)), + takeUntil(this._destroy$), ); - const reconnectOnDisconnect = this._connectionState$.pipe( + const reconnectOnDisconnect$ = this._connectionState$.pipe( // tap(x => console.warn(">>>> _connectionState$ state changed", x)), filter(x => x.status === ConnectionStatus.disconnected && x.reason === errorReasonName), // tap(x => console.warn(">>>> reconnecting...", x)), - switchMap(() => this.connect()) + switchMap(() => this.connect()), + takeUntil(this._destroy$), ); - this.effects$$ = merge( + [ desiredDisconnected$, - reconnectOnDisconnect, + reconnectOnDisconnect$, connection$ - ).subscribe(); + ].forEach((x: Observable) => x.subscribe()); } connect(data?: () => Dictionary): Observable { - // console.info("triggered connect", data); + // console.warn("[connect] init", data); this.desiredState$.next(DesiredConnectionStatus.connected); if (this.internalConnStatus$.value === InternalConnectionStatus.connected) { console.warn(`${this.source} session already connected`); @@ -142,7 +143,7 @@ export class HubConnection { } disconnect(): Observable { - // console.info("triggered disconnect"); + // console.warn("[disconnect] init"); this.desiredState$.next(DesiredConnectionStatus.disconnected); return this.untilDisconnects$(); } @@ -188,11 +189,11 @@ export class HubConnection { } send(methodName: keyof THub | "StreamUnsubscribe", ...args: unknown[]): Observable { - return fromPromise(this.hubConnection.send(methodName.toString(), ...args)); + return from(this.hubConnection.send(methodName.toString(), ...args)); } invoke(methodName: keyof THub, ...args: unknown[]): Observable { - return fromPromise>(this.hubConnection.invoke(methodName.toString(), ...args)); + return from>(this.hubConnection.invoke(methodName.toString(), ...args)); } dispose(): void { @@ -200,13 +201,14 @@ export class HubConnection { this.desiredState$.complete(); this._connectionState$.complete(); this.internalConnStatus$.complete(); - this.effects$$.unsubscribe(); + this._destroy$.next(); + this._destroy$.complete(); } private _disconnect(): Observable { - // console.info("triggered _disconnect", this.internalConnStatus$.value); - return this.internalConnStatus$.value === InternalConnectionStatus.connected - ? fromPromise(this.hubConnection.stop()) + // console.warn("[_disconnect] init", this.internalConnStatus$.value, this._connectionState$.value); + return this._connectionState$.value.status !== ConnectionStatus.disconnected + ? from(this.hubConnection.stop()) : emptyNext(); } @@ -225,11 +227,12 @@ export class HubConnection { } private openConnection() { - // console.info("triggered openConnection"); + // console.warn("[openConnection]"); return emptyNext().pipe( // tap(x => console.warn(">>>> openConnection - attempting to connect", x)), - switchMap(() => fromPromise(this.hubConnection.start())), - // tap(x => console.warn(">>>> openConnection - connection established", x)), + tap(() => this._connectionState$.next(connectingState)), + switchMap(() => from(this.hubConnection.start())), + // tap(x => console.warn(">>>> [openConnection] - connection established", x)), retryWhen(errors => errors.pipe( scan((errorCount: number) => ++errorCount, 0), delayWhen((retryCount: number) => {