Skip to content

Commit

Permalink
fix(hub connection): correctly close the websocket connection when `d…
Browse files Browse the repository at this point in the history
…isconnect` is called right after `connect` (#54)

### Features

- **hub connection:** ability to configure microsoft `HubConnection` directly via `configureSignalRHubConnection`

### Bug Fixes

- **hub connection:** correctly close the websocket connection when `disconnect` is called right after `connect`
- **hub connection:** `connectionState$` `connecting` will also be set when connecting not only when reconnecting
  • Loading branch information
stephenlautier authored Nov 23, 2022
1 parent 4e94c71 commit d3b297b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 36 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## [3.1.0](https://github.com/sketch7/signalr-client/compare/3.0.0...3.1.0) (2022-11-22)

### Features

- **hub connection:** ability to configure microsoft `HubConnection` directly via `configureSignalRHubConnection`

### Bug Fixes

- **hub connection:** correctly close the websocket connection when `disconnect` is called right after `connect`
- **hub connection:** `connectionState$` `connecting` will also be set when connecting not only when reconnecting

## [3.0.0](https://github.com/sketch7/signalr-client/compare/2.0.0...3.0.0) (2021-04-30)

### Features
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ssv/signalr-client",
"version": "3.0.0",
"version": "3.1.0",
"versionSuffix": "",
"description": "SignalR client library built on top of @microsoft/signalr. This gives you more features and easier to use.",
"homepage": "https://github.com/sketch7/signalr-client",
Expand Down
31 changes: 29 additions & 2 deletions src/hub-connection.connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ describe("HubConnection Specs", () => {

beforeEach(() => {
hubBackend.connection.start = jest.fn().mockReturnValue(promiseDelayResolve(5));
hubBackend.connection.stop = jest.fn().mockReturnValue(promiseDelayResolve(5));
});


describe("and connects successfully", () => {



// connect -> WHILE CONNECTING -> disconnect
it("should have status disconnected", done => {
const connect$ = SUT.connect();
const state$ = SUT.connectionState$.pipe(
Expand All @@ -81,13 +82,39 @@ describe("HubConnection Specs", () => {
withLatestFrom(SUT.connectionState$, (_x, y) => y),
tap(state => {
expect(hubBackend.connection.start).toHaveBeenCalledTimes(1);
expect(hubBackend.connection.stop).toHaveBeenCalledTimes(1);
expect(state.status).toBe(ConnectionStatus.disconnected);
done();
})
);
conn$$ = merge(connect$, state$).subscribe();
});

describe("and connects with different data", () => {


// connect -> WHILE CONNECTING -> disconnect -> connect with different data
it("should have status connected", done => {
const connect$ = SUT.connect();
const state$ = SUT.connectionState$.pipe(
first(),
switchMap(() => SUT.disconnect()),
delay(2), // ensure start is in flight
switchMap(() => SUT.connect(() => ({ second: "true" }))),
withLatestFrom(SUT.connectionState$, (_x, y) => y),
tap(state => {
expect(hubBackend.connection.start).toHaveBeenCalledTimes(2);
expect(hubBackend.connection.stop).toHaveBeenCalledTimes(1);
expect(state.status).toBe(ConnectionStatus.connected);
done();
})
);
conn$$ = merge(connect$, state$).subscribe();
});



});


});
Expand Down Expand Up @@ -184,7 +211,7 @@ describe("HubConnection Specs", () => {
tap(state => {
expect(state.status).toBe(ConnectionStatus.disconnected);
expect(hubStartSpy).toBeCalledTimes(1);
expect(hubStopSpy).not.toBeCalled();
// expect(hubStopSpy).not.toBeCalled();
done();
}),
);
Expand Down
6 changes: 5 additions & 1 deletion src/hub-connection.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IHttpConnectionOptions, IHubProtocol } from "@microsoft/signalr";
import { HubConnection, IHttpConnectionOptions, IHubProtocol } from "@microsoft/signalr";

import { Dictionary } from "./utils/dictionary";

Expand Down Expand Up @@ -38,6 +38,10 @@ export interface HubConnectionOptions {
/** @internal */
getData?: () => Dictionary<string>;
protocol?: IHubProtocol;
/**
* Configures the SignalR Hub connection after it has been built (raw) in order to access/configure `serverTimeoutInMilliseconds`, `keepAliveIntervalInMilliseconds` etc...
*/
configureSignalRHubConnection?: (hubConnection: HubConnection) => void;
}

export interface ConnectionOptions extends IHttpConnectionOptions {
Expand Down
67 changes: 35 additions & 32 deletions src/hub-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import {
} from "rxjs/operators";
import {
HubConnection as SignalRHubConnection,
HubConnectionBuilder as SignalRHubConnectionBuilder
HubConnectionBuilder as SignalRHubConnectionBuilder,
} from "@microsoft/signalr";
import { from as fromPromise, BehaviorSubject, Observable, Observer, timer, throwError, Subscription, merge } from "rxjs";
import { from, BehaviorSubject, Observable, Observer, timer, throwError, Subject } from "rxjs";

import {
ConnectionState, ConnectionStatus, HubConnectionOptions,
Expand All @@ -20,6 +20,7 @@ import { emptyNext } from "./utils/rxjs";
const errorReasonName = "error";
const disconnectedState = Object.freeze<ConnectionState>({ status: ConnectionStatus.disconnected });
const connectedState = Object.freeze<ConnectionState>({ status: ConnectionStatus.connected });
const connectingState = Object.freeze<ConnectionState>({ status: ConnectionStatus.connecting });

// todo: rename HubClient?
export class HubConnection<THub> {
Expand All @@ -36,7 +37,7 @@ export class HubConnection<THub> {
private desiredState$ = new BehaviorSubject<DesiredConnectionStatus>(DesiredConnectionStatus.disconnected);
private internalConnStatus$ = new BehaviorSubject<InternalConnectionStatus>(InternalConnectionStatus.disconnected);
private connectionBuilder = new SignalRHubConnectionBuilder();
private effects$$ = Subscription.EMPTY;
private readonly _destroy$ = new Subject<void>();

private waitUntilConnect$ = this.connectionState$.pipe(
distinctUntilChanged((x, y) => x.status === y.status),
Expand All @@ -62,7 +63,9 @@ export class HubConnection<THub> {
if (connectionOpts.protocol) {
this.connectionBuilder = this.connectionBuilder.withHubProtocol(connectionOpts.protocol);
}

this.hubConnection = this.connectionBuilder.build();
connectionOpts.configureSignalRHubConnection?.(this.hubConnection);
this.hubConnection.onclose(err => {
this.internalConnStatus$.next(InternalConnectionStatus.disconnected);
if (err) {
Expand All @@ -81,42 +84,40 @@ export class HubConnection<THub> {
tap(() => this.internalConnStatus$.next(InternalConnectionStatus.ready)),
filter(() => prevConnectionStatus === InternalConnectionStatus.connected),
switchMap(() => this.openConnection())
))
)),
takeUntil(this._destroy$),
);
const desiredDisconnected$ = this.desiredState$.pipe(
filter(status => status === DesiredConnectionStatus.disconnected),
// tap(status => console.warn(">>>> disconnected$", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })),
// tap(status => console.warn(">>>> [desiredDisconnected$] desired disconnected", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })),
tap(() => {
switch (this.internalConnStatus$.value) {
case InternalConnectionStatus.connected:
this._disconnect();
break;
case InternalConnectionStatus.ready:
this._connectionState$.next(disconnectedState);
break;
// default:
// console.error("desiredDisconnected$ - State unhandled", this.internalConnStatus$.value);
// break;
if (this._connectionState$.value.status !== ConnectionStatus.disconnected) {
// console.warn(">>>> [desiredDisconnected$] _disconnect");
// note: ideally delayWhen disconnect first, though for some reason obs not bubbling
this._disconnect()
}
})
}),
tap(() => this._connectionState$.next(disconnectedState)),
takeUntil(this._destroy$),
);

const reconnectOnDisconnect = this._connectionState$.pipe(
const reconnectOnDisconnect$ = this._connectionState$.pipe(
// tap(x => console.warn(">>>> _connectionState$ state changed", x)),
filter(x => x.status === ConnectionStatus.disconnected && x.reason === errorReasonName),
// tap(x => console.warn(">>>> reconnecting...", x)),
switchMap(() => this.connect())
switchMap(() => this.connect()),
takeUntil(this._destroy$),
);

this.effects$$ = merge(
[
desiredDisconnected$,
reconnectOnDisconnect,
reconnectOnDisconnect$,
connection$
).subscribe();
].forEach((x: Observable<unknown>) => x.subscribe());
}

connect(data?: () => Dictionary<string>): Observable<void> {
// console.info("triggered connect", data);
// console.warn("[connect] init", data);
this.desiredState$.next(DesiredConnectionStatus.connected);
if (this.internalConnStatus$.value === InternalConnectionStatus.connected) {
console.warn(`${this.source} session already connected`);
Expand All @@ -142,7 +143,7 @@ export class HubConnection<THub> {
}

disconnect(): Observable<void> {
// console.info("triggered disconnect");
// console.warn("[disconnect] init");
this.desiredState$.next(DesiredConnectionStatus.disconnected);
return this.untilDisconnects$();
}
Expand Down Expand Up @@ -188,25 +189,26 @@ export class HubConnection<THub> {
}

send(methodName: keyof THub | "StreamUnsubscribe", ...args: unknown[]): Observable<void> {
return fromPromise(this.hubConnection.send(methodName.toString(), ...args));
return from(this.hubConnection.send(methodName.toString(), ...args));
}

invoke<TResult>(methodName: keyof THub, ...args: unknown[]): Observable<TResult> {
return fromPromise<Promise<TResult>>(this.hubConnection.invoke(methodName.toString(), ...args));
return from<Promise<TResult>>(this.hubConnection.invoke(methodName.toString(), ...args));
}

dispose(): void {
this.disconnect();
this.desiredState$.complete();
this._connectionState$.complete();
this.internalConnStatus$.complete();
this.effects$$.unsubscribe();
this._destroy$.next();
this._destroy$.complete();
}

private _disconnect(): Observable<void> {
// console.info("triggered _disconnect", this.internalConnStatus$.value);
return this.internalConnStatus$.value === InternalConnectionStatus.connected
? fromPromise(this.hubConnection.stop())
// console.warn("[_disconnect] init", this.internalConnStatus$.value, this._connectionState$.value);
return this._connectionState$.value.status !== ConnectionStatus.disconnected
? from(this.hubConnection.stop())
: emptyNext();
}

Expand All @@ -225,11 +227,12 @@ export class HubConnection<THub> {
}

private openConnection() {
// console.info("triggered openConnection");
// console.warn("[openConnection]");
return emptyNext().pipe(
// tap(x => console.warn(">>>> openConnection - attempting to connect", x)),
switchMap(() => fromPromise(this.hubConnection.start())),
// tap(x => console.warn(">>>> openConnection - connection established", x)),
tap(() => this._connectionState$.next(connectingState)),
switchMap(() => from(this.hubConnection.start())),
// tap(x => console.warn(">>>> [openConnection] - connection established", x)),
retryWhen(errors => errors.pipe(
scan((errorCount: number) => ++errorCount, 0),
delayWhen((retryCount: number) => {
Expand Down

0 comments on commit d3b297b

Please sign in to comment.