diff --git a/.devcontainer/chefs_local/local.json.sample b/.devcontainer/chefs_local/local.json.sample index 9e9365c47..7217a0c0e 100644 --- a/.devcontainer/chefs_local/local.json.sample +++ b/.devcontainer/chefs_local/local.json.sample @@ -66,8 +66,9 @@ }, "eventStreamService": { "servers": "localhost:4222,localhost:4223,localhost:4224", + "websockets": "false", "streamName": "CHEFS", - "source": "chefs", + "source": "chefs-local", "domain": "forms", "username": "chefs", "password": "password" diff --git a/app/config/custom-environment-variables.json b/app/config/custom-environment-variables.json index c0f5c5236..b86b6c27f 100755 --- a/app/config/custom-environment-variables.json +++ b/app/config/custom-environment-variables.json @@ -63,6 +63,7 @@ }, "eventStreamService": { "servers": "EVENTSTREAMSERVICE_SERVERS", + "websockets": "EVENTSTREAMSERVICE_WEBSOCKETS", "streamName": "EVENTSTREAMSERVICE_STREAMNAME", "source": "EVENTSTREAMSERVICE_SOURCE", "domain": "EVENTSTREAMSERVICE_DOMAIN", diff --git a/app/config/default.json b/app/config/default.json index 9846a221f..29429e874 100644 --- a/app/config/default.json +++ b/app/config/default.json @@ -66,8 +66,9 @@ }, "eventStreamService": { "servers": "localhost:4222,localhost:4223,localhost:4224", + "websockets": "false", "streamName": "CHEFS", - "source": "chefs", + "source": "chefs-local", "domain": "forms", "username": "chefs", "password": "password" diff --git a/app/src/components/eventStreamService.js b/app/src/components/eventStreamService.js index 22b68b0ae..a57290029 100644 --- a/app/src/components/eventStreamService.js +++ b/app/src/components/eventStreamService.js @@ -1,8 +1,21 @@ const config = require('config'); +const falsey = require('falsey'); +const { JSONCodec } = require('nats'); + +// different connection libraries if we are using websockerts or nats protocols. +const WEBSOCKETS = !falsey(config.get('eventStreamService.websockets')); + +let natsConnect; +if (WEBSOCKETS) { + // shim the websocket library + globalThis.WebSocket = require('websocket').w3cwebsocket; + const { connect } = require('nats.ws'); + natsConnect = connect; +} else { + const { connect } = require('nats'); + natsConnect = connect; +} -// shim the websocket library -globalThis.WebSocket = require('websocket').w3cwebsocket; -const { connect } = require('nats.ws'); const log = require('./log')(module.filename); const { FormVersion, Form, FormEventStreamConfig } = require('../forms/common/models'); @@ -23,6 +36,8 @@ const SUBMISSION_EVENT_TYPES = { DELETED: 'deleted', }; +const jsonCodec = JSONCodec(); + class DummyEventStreamService { // this class should not be called if we actually check that this feature is enabled. // however... if we missed that check these calls will do nothing. @@ -111,7 +126,7 @@ class EventStreamService { }); // we either timeout or connect... - const result = await Promise.race([connect(me.natsOptions), timeoutPromise]); + const result = await Promise.race([natsConnect(me.natsOptions), timeoutPromise]); if (timeout) { clearTimeout(timeout); @@ -196,7 +211,8 @@ class EventStreamService { data: encPayload, }, }; - const ack = await this.js.publish(privateSubj, JSON.stringify(privMsg)); + const encodedPayload = jsonCodec.encode(privMsg); + const ack = await this.js.publish(privateSubj, encodedPayload); log.info(`form ${eventType} event (private) - formId: ${formId}, version: ${formVersion.version}, seq: ${ack.seq}`, { function: 'onPublish' }); } if (evntStrmCfg.enablePublicStream) { @@ -204,7 +220,8 @@ class EventStreamService { meta: meta, payload: {}, }; - const ack = await this.js.publish(publicSubj, JSON.stringify(pubMsg)); + const encodedPayload = jsonCodec.encode(pubMsg); + const ack = await this.js.publish(publicSubj, encodedPayload); log.info(`form ${eventType} event (public) - formId: ${formId}, version: ${formVersion.version}, seq: ${ack.seq}`, { function: 'onPublish' }); } } else { @@ -254,7 +271,8 @@ class EventStreamService { data: encPayload, }, }; - const ack = await this.js.publish(privateSubj, JSON.stringify(privMsg)); + const encodedPayload = jsonCodec.encode(privMsg); + const ack = await this.js.publish(privateSubj, encodedPayload); log.info(`submission ${eventType} event (private) - formId: ${formVersion.formId}, version: ${formVersion.version}, submissionId: ${submission.id}, seq: ${ack.seq}`, { function: 'onSubmit', }); @@ -264,7 +282,8 @@ class EventStreamService { meta: meta, payload: {}, }; - const ack = await this.js.publish(publicSubj, JSON.stringify(pubMsg)); + const encodedPayload = jsonCodec.encode(pubMsg); + const ack = await this.js.publish(publicSubj, encodedPayload); log.info(`submission ${eventType} event (public) - formId: ${formVersion.formId}, version: ${formVersion.version}, submissionId: ${submission.id}, seq: ${ack.seq}`, { function: 'onSubmit', }); diff --git a/event-stream-service/package-lock.json b/event-stream-service/package-lock.json index 6747ed9c3..d90e0c013 100644 --- a/event-stream-service/package-lock.json +++ b/event-stream-service/package-lock.json @@ -10,6 +10,7 @@ "license": "Apache-2.0", "dependencies": { "cryptr": "^6.3.0", + "falsey": "^1.0.0", "nats": "^2.28.0", "nats.ws": "^1.29.2", "websocket": "^1.0.35" @@ -120,6 +121,14 @@ "type": "^2.7.2" } }, + "node_modules/falsey": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/falsey/-/falsey-1.0.0.tgz", + "integrity": "sha512-zMDNZ/Ipd8MY0+346CPvhzP1AsiVyNfTOayJza4reAIWf72xbkuFUDcJNxSAsQE1b9Bu0wijKb8Ngnh/a7fI5w==", + "engines": { + "node": ">=4" + } + }, "node_modules/is-typedarray": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/is-typedarray/-/is-typedarray-1.0.0.tgz", diff --git a/event-stream-service/package.json b/event-stream-service/package.json index d94fca715..b6913f331 100644 --- a/event-stream-service/package.json +++ b/event-stream-service/package.json @@ -6,6 +6,7 @@ "scripts": {}, "dependencies": { "cryptr": "^6.3.0", + "falsey": "^1.0.0", "nats": "^2.28.0", "nats.ws": "^1.29.2", "websocket": "^1.0.35" diff --git a/event-stream-service/pullConsumer.js b/event-stream-service/pullConsumer.js index 775b0f11e..2ee618651 100644 --- a/event-stream-service/pullConsumer.js +++ b/event-stream-service/pullConsumer.js @@ -1,9 +1,31 @@ -const { AckPolicy } = require("nats"); +const { AckPolicy, JSONCodec } = require("nats"); const Cryptr = require("cryptr"); +const falsey = require("falsey"); +/* + command line pass in environment variables for: + SERVERS - which nats instance to connect to (default is local from docker-compose) + WEBSOCKET - connect via nats protocol or websockets. true/false (default false, connect via nats) + ENCRYPTION_KEY - what encryption key to decrypt (Cryptr - aes-256-gcm) private payloads -// shim the websocket library -globalThis.WebSocket = require("websocket").w3cwebsocket; -const { connect } = require("nats.ws"); + Example: + SERVERS=ess-a191b5-dev.apps.silver.devops.gov.bc.ca WEBSOCKETS=true ENCRYPTION_KEY=ad5520469720325d1694c87511afda28a0432dd974cb77b5b4b9f946a5af6985 node pullConsumer.js +*/ + +// different connection libraries if we are using websockerts or nats protocols. +const WEBSOCKETS = !falsey(process.env.WEBSOCKETS); + +let natsConnect; +if (WEBSOCKETS) { + console.log("connect via ws"); + // shim the websocket library + globalThis.WebSocket = require("websocket").w3cwebsocket; + const { connect } = require("nats.ws"); + natsConnect = connect; +} else { + console.log("connect via nats"); + const { connect } = require("nats"); + natsConnect = connect; +} // connection info let servers = []; @@ -11,7 +33,7 @@ if (process.env.SERVERS) { servers = process.env.SERVERS.split(","); } else { // running locally - servers = ["localhost:4222", "localhost:4223", "localhost:4224"]; + servers = "localhost:4222,localhost:4223,localhost:4224".split(","); } let nc = undefined; // nats connection @@ -34,8 +56,9 @@ const printMsg = (m) => { `msg seq: ${m.seq}, subject: ${m.subject}, timestamp: ${ts}, streamSequence: ${m.info.streamSequence}, deliverySequence: ${m.info.deliverySequence}` ); // illustrate (one way of) grabbing message content as json - const data = m.json(); - console.log(JSON.stringify(data, null, 2)); + const jsonCodec = JSONCodec(); + const data = jsonCodec.decode(m.data); + console.log(data); try { if ( data && @@ -67,7 +90,7 @@ const init = async () => { // no credentials provided. // anonymous connections have read access to the stream console.log(`connect to nats server(s) ${servers} as 'anonymous'...`); - nc = await connect({ + nc = await natsConnect({ servers: servers, reconnectTimeWait: 10 * 1000, // 10s }); diff --git a/openshift/README.md b/openshift/README.md index f0d54d982..058a90b15 100644 --- a/openshift/README.md +++ b/openshift/README.md @@ -157,16 +157,18 @@ oc create -n $NAMESPACE secret generic $APP_NAME-encryption-keys \ --from-literal=proxy=$proxy_key ``` -We need to store a password for Event Stream Service client. Since the server(s) will change along with the password, we will store the server and credentials in a secret per environment (DEV, TEST, PROD). Pull requests can use the same as DEV. +We need to store a password for Event Stream Service client. Since the server(s) will change along with the password, we will store the server and credentials in a secret per environment (DEV, TEST, PROD) and whether we connect with WebSockets or NATS protocols Pull requests can use the same as DEV. ```sh export ess_servers= +export ess_websockets= export ess_password= oc create -n $NAMESPACE secret generic $APP_NAME-event-stream-service \ --type=Opaque \ --from-literal=servers=$ess_servers \ + --from-literal=websockets=$ess_websockets \ --from-literal=username=chefs \ --from-literal=password=$ess_password ``` diff --git a/openshift/app.dc.yaml b/openshift/app.dc.yaml index 1fedf1aba..7afebe3a7 100644 --- a/openshift/app.dc.yaml +++ b/openshift/app.dc.yaml @@ -238,6 +238,11 @@ objects: secretKeyRef: key: servers name: "${APP_NAME}-event-stream-service" + - name: EVENTSTREAMSERVICE_WEBSOCKETS + valueFrom: + secretKeyRef: + key: websockets + name: "${APP_NAME}-event-stream-service" - name: EVENTSTREAMSERVICE_USERNAME valueFrom: secretKeyRef: