Skip to content

Commit

Permalink
#141 : Fix PrivateTopics (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien-Molina authored and maurelio1234 committed Jul 24, 2018
1 parent 9d0918d commit f67e8e6
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 67 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "reactivexcomponent.js",
"version": "3.1.7",
"version": "3.1.8",
"description": "Javascript reactive client API for XComponent",
"directories": {
"test": "test",
Expand Down
6 changes: 1 addition & 5 deletions src/communication/WebSocketPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiConfiguration } from "../configuration/apiConfiguration";
import { Header, Event, Data, Serializer} from "./xcomponentMessages";
import { Header, Data, Serializer} from "./xcomponentMessages";
import { StateMachineRef } from "../interfaces/StateMachineRef";
import { PrivateTopics } from "../interfaces/PrivateTopics";

Expand Down Expand Up @@ -31,10 +31,6 @@ export class WebSocketPublisher {
return false;
}

public dispose(): void {

}

private getDataToSend(componentName: string, stateMachineName: string, messageType: string, jsonMessage: any, visibilityPrivate: boolean = false, specifiedPrivateTopic: string = undefined): Data {
const componentCode = this.configuration.getComponentCode(componentName);
const stateMachineCode = this.configuration.getStateMachineCode(componentName, stateMachineName);
Expand Down
15 changes: 6 additions & 9 deletions src/communication/WebSocketSession.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@

import { WebSocketPublisher } from "./WebSocketPublisher";
import { WebSocketSubscriber } from "./WebSocketSubscriber";
import { Utils } from "./Utils";
import { Kinds } from "../configuration/xcWebSocketBridgeConfiguration";
import * as definition from "definition";
import { ApiConfiguration } from "../configuration/apiConfiguration";
import { Session } from "../interfaces/Session";
import { PrivateTopics } from "../interfaces/PrivateTopics";
Expand All @@ -16,10 +13,11 @@ export class WebSocketSession implements Session {
private subscriber: WebSocketSubscriber;
public privateTopics: PrivateTopics;

constructor(private webSocket: WebSocket, private configuration: ApiConfiguration, private sessionData?: string) {
this.privateTopics = new PrivateTopics();
constructor(private webSocket: WebSocket, configuration: ApiConfiguration, sessionData?: string) {
this.subscriber = new WebSocketSubscriber(this.webSocket, configuration);
this.privateTopics = new PrivateTopics(this.subscriber);
this.publisher = new WebSocketPublisher(this.webSocket, configuration, this.privateTopics, sessionData);
this.subscriber = new WebSocketSubscriber(this.webSocket, configuration, this.publisher, this.privateTopics);
this.subscriber.setStateMachineRefSendPublisher(this.publisher);
}

public send(componentName: string, stateMachineName: string, messageType: string, jsonMessage: any, visibilityPrivate: boolean = false, specifiedPrivateTopic: string = undefined): void {
Expand All @@ -31,7 +29,7 @@ export class WebSocketSession implements Session {
}

public getSnapshot(componentName: string, stateMachineName: string): Promise<Array<StateMachineInstance>> {
return this.subscriber.getSnapshot(componentName, stateMachineName);
return this.subscriber.getSnapshot(componentName, stateMachineName, this.privateTopics);
}

public getStateMachineUpdates(componentName: string, stateMachineName: string): Observable<StateMachineInstance> {
Expand All @@ -52,7 +50,6 @@ export class WebSocketSession implements Session {
}

public dispose(): void {
this.publisher.dispose();
this.subscriber.dispose();
this.privateTopics.dispose();
}
}
25 changes: 11 additions & 14 deletions src/communication/WebSocketSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ import { Commands, Kinds } from "../configuration/xcWebSocketBridgeConfiguration
import { ApiConfiguration, SubscriberEventType } from "../configuration/apiConfiguration";
import "rxjs/add/operator/toPromise";
import { WebSocketPublisher } from "../communication/WebSocketPublisher";
import {
Component, CompositionModel, DeserializedData, CommandData, Header,
Event, Data, getHeaderWithIncomingType,
import { DeserializedData, Event, Data, getHeaderWithIncomingType,
Serializer, Deserializer, fatalErrorState } from "./xcomponentMessages";
import { } from "./clientMessages";
import { error } from "util";
import { PrivateTopics } from "../interfaces/PrivateTopics";
import { StateMachineInstance } from "../interfaces/StateMachineInstance";
import { StateMachineRef } from "../interfaces/StateMachineRef";
Expand All @@ -20,13 +16,14 @@ import { Logger } from "log4ts";

export class WebSocketSubscriber {
private logger: Logger = Logger.getLogger("WebSocketSubscriber");
private stateMachineRefSendPublisher: WebSocketPublisher;
private subscribedStateMachines: { [componentName: string]: Array<String> };
private updates$: Observable<DeserializedData>;
private deserializer: Deserializer;
private serializer: Serializer;
private timeout: string;

constructor(private webSocket: WebSocket, private configuration: ApiConfiguration, private stateMachineRefSendPublisher: WebSocketPublisher, private privateTopics: PrivateTopics) {
constructor(private webSocket: WebSocket, private configuration: ApiConfiguration) {
this.subscribedStateMachines = {};
this.deserializer = new Deserializer();
this.serializer = new Serializer();
Expand All @@ -35,7 +32,11 @@ export class WebSocketSubscriber {
this.updates$ = observableFromEvent(this.webSocket, "message").pipe(map((rawMessage: MessageEvent) => thisSubscriber.deserializer.deserialize(rawMessage.data || rawMessage)));
}

getSnapshot(componentName: string, stateMachineName: string): Promise<Array<StateMachineInstance>> {
setStateMachineRefSendPublisher(stateMachineRefSendPublisher: WebSocketPublisher) {
this.stateMachineRefSendPublisher = stateMachineRefSendPublisher;
}

getSnapshot(componentName: string, stateMachineName: string, privateTopics: PrivateTopics): Promise<Array<StateMachineInstance>> {
const replyTopic = uuid();
const thisSubscriber = this;
const promise = this.updates$.pipe(
Expand All @@ -47,22 +48,18 @@ export class WebSocketSubscriber {
}))
.toPromise();
this.sendSubscribeRequestToTopic(replyTopic, Kinds.Snapshot);
const dataToSendSnapshot = this.getDataToSendSnapshot(componentName, stateMachineName, replyTopic);
const dataToSendSnapshot = this.getDataToSendSnapshot(componentName, stateMachineName, replyTopic, privateTopics);
this.webSocket.send(thisSubscriber.serializer.convertToWebsocketInputFormat(dataToSendSnapshot));
return promise;
}

public dispose(): void {

}

private getDataToSendSnapshot(componentName: string, stateMachineName: string, replyTopic: string): Data {
private getDataToSendSnapshot(componentName: string, stateMachineName: string, replyTopic: string, privateTopics: PrivateTopics): Data {
const componentCode = this.configuration.getComponentCode(componentName);
const stateMachineCode = this.configuration.getStateMachineCode(componentName, stateMachineName);
let topic = this.configuration.getSnapshotTopic(componentCode);
let jsonMessage = {
Timeout: this.timeout,
CallerPrivateTopic: this.privateTopics.getSubscriberTopics(),
CallerPrivateTopic: privateTopics.getSubscriberTopics(),
ReplyTopic: replyTopic
};
let header = getHeaderWithIncomingType();
Expand Down
23 changes: 17 additions & 6 deletions src/interfaces/PrivateTopics.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Utils } from "../communication/Utils";
import * as uuid from "uuid/v4";
import { WebSocketSubscriber } from "../communication/WebSocketSubscriber";
import { Kinds } from "../configuration/xcWebSocketBridgeConfiguration";

export class PrivateTopics {
private defaultPublisherTopic: string;
private subscriberTopics: Array<string>;
private defaultPublisherTopic: string = uuid();
private subscriberTopics: Array<string> = [];

constructor() {
this.defaultPublisherTopic = uuid();
this.subscriberTopics = [this.defaultPublisherTopic];
constructor(private subscriber: WebSocketSubscriber) {
this.addSubscriberTopic(this.defaultPublisherTopic);
}

public setDefaultPublisherTopic(newDefaultPublisherTopic: string): void {
Expand All @@ -24,15 +25,25 @@ export class PrivateTopics {

public addSubscriberTopic(privateTopic: string): void {
if (privateTopic && this.subscriberTopics.indexOf(privateTopic) === -1) {
this.subscriber.sendSubscribeRequestToTopic(privateTopic, Kinds.Private);
this.subscriberTopics.push(privateTopic);
}
}

public removeSubscriberTopic(privateTopic: string): void {
Utils.removeElementFromArray(this.subscriberTopics, privateTopic);
if (privateTopic && this.subscriberTopics.indexOf(privateTopic) !== -1) {
this.subscriber.sendUnsubscribeRequestToTopic(privateTopic, Kinds.Private);
Utils.removeElementFromArray(this.subscriberTopics, privateTopic);
}
}

public getSubscriberTopics(): Array<string> {
return this.subscriberTopics;
}

public dispose(): void {
this.subscriberTopics.forEach((subscriberTopic: string) => {
this.subscriber.sendUnsubscribeRequestToTopic(subscriberTopic, Kinds.Private);
}, this);
}
}
15 changes: 6 additions & 9 deletions test/spec/WebSocketConnectionSpec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import { XComponent } from "../../src/XComponent";
import { WebSocket, Server, SocketIO } from "mock-socket";
import { WebSocketConnection } from "../../src/communication/WebSocketConnection";
import { Connection } from "../../src/interfaces/Connection";
import { WebSocket, Server } from "mock-socket";
import { ErrorListener } from "../../src/interfaces/ErrorListener";
import Mock from "./mock/mockSubscriberDependencies";
import pako = require("pako");
import * as uuid from "uuid/v4";
import { log } from "util";

const encodeServerMessage = (strData: string) => {
let binaryString = pako.deflate(strData, { to: "string" });
Expand All @@ -33,7 +30,7 @@ describe("Test Connection module", function () {
let xcApiFileName = "api.xcApi";
new XComponent().connect(serverUrl)
.then(connection => {
return connection.createSession(xcApiFileName)
return connection.createSession(xcApiFileName);
})
.then(session => {
expect(session).not.toBe(null);
Expand All @@ -45,10 +42,10 @@ describe("Test Connection module", function () {

mockServer.on("connection", function (server) {
server.on("message", function (message) {
const getApiResponse = `<deployment>
<clientAPICommunication>
const getApiResponse = `<deployment>
<clientAPICommunication>
</clientAPICommunication>
<codesConverter>
<codesConverter>
</codesConverter>
</deployment>`;
let content = encodeServerMessage(getApiResponse);
Expand Down Expand Up @@ -159,7 +156,7 @@ describe("Test Connection module", function () {
});
});

class FakeErrorHandler implements ErrorListener{
class FakeErrorHandler implements ErrorListener {
constructor(private done) {
}

Expand Down
27 changes: 17 additions & 10 deletions test/spec/WebSocketPublisherSpec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { WebSocketPublisher } from "../../src/communication/WebSocketPublisher";
import Mock from "./mock/mockPublisherDependencies";
import { PrivateTopics } from "../../src/interfaces/PrivateTopics";
import { WebSocketSubscriber } from "../../src/communication/WebSocketSubscriber";

describe("Test xcWebSocketPublisher module", function () {

Expand All @@ -9,44 +10,50 @@ describe("Test xcWebSocketPublisher module", function () {
});

describe("Test send method", function () {
var publisher, privateTopics;
privateTopics = new PrivateTopics();
let publisher, privateTopics;
beforeEach(function () {
publisher = new WebSocketPublisher(Mock.createMockWebSocket(), Mock.configuration, privateTopics, Mock.sessionData);
const webSocket = Mock.createMockWebSocket();
const webSocketSubscriber = new WebSocketSubscriber(webSocket, Mock.configuration);
privateTopics = new PrivateTopics(webSocketSubscriber);
publisher = new WebSocketPublisher(webSocket, Mock.configuration, privateTopics, Mock.sessionData);
});

it("should send a message to the given stateMachine and component", function () {
publisher.send("componentName", "stateMachineName", Mock.messageType, Mock.jsonMessage);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(1);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(2);
expect(publisher.webSocket.send).toHaveBeenCalledWith(Mock.getCorretWebsocketInputFormat(false));
});

it("sould send a message in a private topic to the given stateMachine and component", function () {
privateTopics.setDefaultPublisherTopic(Mock.guiExample);
publisher.send("componentName", "stateMachineName", Mock.messageType, Mock.jsonMessage, true);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(1);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(4);
expect(publisher.webSocket.send).toHaveBeenCalledWith(Mock.getCorretWebsocketInputFormat(true));
});

});

describe("Test sendWithStateMachineRef", function () {
var publisher;
let publisher;
beforeEach(function () {
publisher = new WebSocketPublisher(Mock.createMockWebSocket(), Mock.configuration, new PrivateTopics(), Mock.sessionData);
const webSocket = Mock.createMockWebSocket();
const webSocketSubscriber = new WebSocketSubscriber(webSocket, Mock.configuration);
publisher = new WebSocketPublisher(webSocket, Mock.configuration, new PrivateTopics(webSocketSubscriber), Mock.sessionData);
});

it("sould send a message to the given instance of stateMachine", function () {
publisher.sendWithStateMachineRef(Mock.stateMachineRef, Mock.messageType, Mock.jsonMessage);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(1);
expect(publisher.webSocket.send).toHaveBeenCalledTimes(2);
expect(publisher.webSocket.send).toHaveBeenCalledWith(Mock.corretWebsocketInputFormatForSendSMRef);
});
});

describe("Test canSend", function () {
var publisher;
let publisher;
beforeEach(function () {
publisher = new WebSocketPublisher(Mock.createMockWebSocket(), Mock.configuration, new PrivateTopics(), Mock.sessionData);
const webSocket = Mock.createMockWebSocket();
const webSocketSubscriber = new WebSocketSubscriber(webSocket, Mock.configuration);
publisher = new WebSocketPublisher(webSocket, Mock.configuration, new PrivateTopics(webSocketSubscriber), Mock.sessionData);
});

it("should return true if there is a publisher details and false otherwise", function () {
Expand Down
6 changes: 2 additions & 4 deletions test/spec/WebSocketSessionSpec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { WebSocket, Server, SocketIO } from "mock-socket";
import { WebSocketPublisher } from "../../src/communication/WebSocketPublisher";
import { WebSocketSubscriber } from "../../src/communication/WebSocketSubscriber";
import { WebSocket} from "mock-socket";
import { WebSocketSession } from "../../src/communication/WebSocketSession";


Expand All @@ -14,7 +12,7 @@ describe("Test xcSession module", function () {
const session = new WebSocketSession(mockWebSocket, null);
session.privateTopics.setDefaultPublisherTopic(undefined);
session.privateTopics.addSubscriberTopic(undefined);
expect(mockWebSocket.send).toHaveBeenCalledTimes(0);
expect(mockWebSocket.send).toHaveBeenCalledTimes(1);
});

it("Should add and set correctly the given private topics", () => {
Expand Down
17 changes: 8 additions & 9 deletions test/spec/WebSocketSubscriberSpec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { WebSocketSubscriber } from "../../src/communication/WebSocketSubscriber";
import { Deserializer, Serializer } from "../../src/communication/xcomponentMessages";
import Rx = require("rx");
import { Deserializer } from "../../src/communication/xcomponentMessages";
import Mock from "./mock/mockSubscriberDependencies";
import { EventEmitter } from "events";
import { PrivateTopics } from "../../src/interfaces/PrivateTopics";
Expand All @@ -18,7 +17,7 @@ describe("Test xcWebSocketSubscriber module", function () {
let serverUrl = "wss://" + uuid();
mockServer = Mock.createMockServer(serverUrl);
mockWebSocket = new WebSocket(serverUrl);
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration, undefined, undefined);
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration);
});

it("subscribe to a state machine, subscriberListener callback should be executed when a message is received", function (done) {
Expand Down Expand Up @@ -58,7 +57,7 @@ describe("Test xcWebSocketSubscriber module", function () {
stateMachineName = "stateMachineName";
beforeEach(function () {
mockWebSocket = Mock.createWebSocket();
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration, undefined, undefined);
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration);
});

it("unsubscribe to a subscribed state machine", function () {
Expand All @@ -77,7 +76,7 @@ describe("Test xcWebSocketSubscriber module", function () {
return new Promise((resolve, reject) => {
let mockWebSocket: any = new EventEmitter();
mockWebSocket.send = jest.fn();
const subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration, undefined, undefined);
const subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration);

let observable = subscriber.getStateMachineUpdates("componentName", "stateMachineName");

Expand All @@ -99,11 +98,11 @@ describe("Test xcWebSocketSubscriber module", function () {
let subscriber, mockServer, mockWebSocket, privateTopics;
beforeEach(function () {
let serverUrl = "wss://" + uuid();
privateTopics = new PrivateTopics();
privateTopics.setDefaultPublisherTopic(Mock.privateTopic);
mockServer = Mock.createMockServer(serverUrl);
mockWebSocket = new WebSocket(serverUrl);
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration, null, privateTopics);
subscriber = new WebSocketSubscriber(mockWebSocket, Mock.configuration);
privateTopics = new PrivateTopics(subscriber);
privateTopics.setDefaultPublisherTopic(Mock.privateTopic);
});

it("send snapshot request, promise resolve method should be executed when a response is received", function (done) {
Expand Down Expand Up @@ -154,7 +153,7 @@ describe("Test xcWebSocketSubscriber module", function () {
});
});
mockWebSocket.onopen = function (e) {
subscriber.getSnapshot("component", "stateMachine")
subscriber.getSnapshot("component", "stateMachine", privateTopics)
.then(items => mockServer.stop(done))
.catch(err => {
console.error(err);
Expand Down

0 comments on commit f67e8e6

Please sign in to comment.