Skip to content

Commit

Permalink
add WEBSOCKETS flag, allow to connect to either NATS protocol or WS
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Sherman <[email protected]>
  • Loading branch information
usingtechnology committed Sep 5, 2024
1 parent bd1f8cf commit fcc9885
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/chefs_local/local.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@
},
"eventStreamService": {
"servers": "localhost:4222,localhost:4223,localhost:4224",
"websockets": "false",
"streamName": "CHEFS",
"source": "chefs",
"source": "chefs-local",
"domain": "forms",
"username": "chefs",
"password": "password"
Expand Down
1 change: 1 addition & 0 deletions app/config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
},
"eventStreamService": {
"servers": "EVENTSTREAMSERVICE_SERVERS",
"websockets": "EVENTSTREAMSERVICE_WEBSOCKETS",
"streamName": "EVENTSTREAMSERVICE_STREAMNAME",
"source": "EVENTSTREAMSERVICE_SOURCE",
"domain": "EVENTSTREAMSERVICE_DOMAIN",
Expand Down
3 changes: 2 additions & 1 deletion app/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@
},
"eventStreamService": {
"servers": "localhost:4222,localhost:4223,localhost:4224",
"websockets": "false",
"streamName": "CHEFS",
"source": "chefs",
"source": "chefs-local",
"domain": "forms",
"username": "chefs",
"password": "password"
Expand Down
35 changes: 27 additions & 8 deletions app/src/components/eventStreamService.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
const config = require('config');
const falsey = require('falsey');
const { JSONCodec } = require('nats');

// different connection libraries if we are using websockerts or nats protocols.
const WEBSOCKETS = !falsey(config.get('eventStreamService.websockets'));

let natsConnect;
if (WEBSOCKETS) {
// shim the websocket library
globalThis.WebSocket = require('websocket').w3cwebsocket;
const { connect } = require('nats.ws');
natsConnect = connect;
} else {
const { connect } = require('nats');
natsConnect = connect;
}

// shim the websocket library
globalThis.WebSocket = require('websocket').w3cwebsocket;
const { connect } = require('nats.ws');
const log = require('./log')(module.filename);

const { FormVersion, Form, FormEventStreamConfig } = require('../forms/common/models');
Expand All @@ -23,6 +36,8 @@ const SUBMISSION_EVENT_TYPES = {
DELETED: 'deleted',
};

const jsonCodec = JSONCodec();

class DummyEventStreamService {
// this class should not be called if we actually check that this feature is enabled.
// however... if we missed that check these calls will do nothing.
Expand Down Expand Up @@ -111,7 +126,7 @@ class EventStreamService {
});

// we either timeout or connect...
const result = await Promise.race([connect(me.natsOptions), timeoutPromise]);
const result = await Promise.race([natsConnect(me.natsOptions), timeoutPromise]);

if (timeout) {
clearTimeout(timeout);
Expand Down Expand Up @@ -196,15 +211,17 @@ class EventStreamService {
data: encPayload,
},
};
const ack = await this.js.publish(privateSubj, JSON.stringify(privMsg));
const encodedPayload = jsonCodec.encode(privMsg);
const ack = await this.js.publish(privateSubj, encodedPayload);
log.info(`form ${eventType} event (private) - formId: ${formId}, version: ${formVersion.version}, seq: ${ack.seq}`, { function: 'onPublish' });
}
if (evntStrmCfg.enablePublicStream) {
const pubMsg = {
meta: meta,
payload: {},
};
const ack = await this.js.publish(publicSubj, JSON.stringify(pubMsg));
const encodedPayload = jsonCodec.encode(pubMsg);
const ack = await this.js.publish(publicSubj, encodedPayload);
log.info(`form ${eventType} event (public) - formId: ${formId}, version: ${formVersion.version}, seq: ${ack.seq}`, { function: 'onPublish' });
}
} else {
Expand Down Expand Up @@ -254,7 +271,8 @@ class EventStreamService {
data: encPayload,
},
};
const ack = await this.js.publish(privateSubj, JSON.stringify(privMsg));
const encodedPayload = jsonCodec.encode(privMsg);
const ack = await this.js.publish(privateSubj, encodedPayload);
log.info(`submission ${eventType} event (private) - formId: ${formVersion.formId}, version: ${formVersion.version}, submissionId: ${submission.id}, seq: ${ack.seq}`, {
function: 'onSubmit',
});
Expand All @@ -264,7 +282,8 @@ class EventStreamService {
meta: meta,
payload: {},
};
const ack = await this.js.publish(publicSubj, JSON.stringify(pubMsg));
const encodedPayload = jsonCodec.encode(pubMsg);
const ack = await this.js.publish(publicSubj, encodedPayload);
log.info(`submission ${eventType} event (public) - formId: ${formVersion.formId}, version: ${formVersion.version}, submissionId: ${submission.id}, seq: ${ack.seq}`, {
function: 'onSubmit',
});
Expand Down
9 changes: 9 additions & 0 deletions event-stream-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions event-stream-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"scripts": {},
"dependencies": {
"cryptr": "^6.3.0",
"falsey": "^1.0.0",
"nats": "^2.28.0",
"nats.ws": "^1.29.2",
"websocket": "^1.0.35"
Expand Down
39 changes: 31 additions & 8 deletions event-stream-service/pullConsumer.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
const { AckPolicy } = require("nats");
const { AckPolicy, JSONCodec } = require("nats");
const Cryptr = require("cryptr");
const falsey = require("falsey");
/*
command line pass in environment variables for:
SERVERS - which nats instance to connect to (default is local from docker-compose)
WEBSOCKET - connect via nats protocol or websockets. true/false (default false, connect via nats)
ENCRYPTION_KEY - what encryption key to decrypt (Cryptr - aes-256-gcm) private payloads
// shim the websocket library
globalThis.WebSocket = require("websocket").w3cwebsocket;
const { connect } = require("nats.ws");
Example:
SERVERS=ess-a191b5-dev.apps.silver.devops.gov.bc.ca WEBSOCKETS=true ENCRYPTION_KEY=ad5520469720325d1694c87511afda28a0432dd974cb77b5b4b9f946a5af6985 node pullConsumer.js
*/

// different connection libraries if we are using websockerts or nats protocols.
const WEBSOCKETS = !falsey(process.env.WEBSOCKETS);

let natsConnect;
if (WEBSOCKETS) {
console.log("connect via ws");
// shim the websocket library
globalThis.WebSocket = require("websocket").w3cwebsocket;
const { connect } = require("nats.ws");
natsConnect = connect;
} else {
console.log("connect via nats");
const { connect } = require("nats");
natsConnect = connect;
}

// connection info
let servers = [];
if (process.env.SERVERS) {
servers = process.env.SERVERS.split(",");
} else {
// running locally
servers = ["localhost:4222", "localhost:4223", "localhost:4224"];
servers = "localhost:4222,localhost:4223,localhost:4224".split(",");
}

let nc = undefined; // nats connection
Expand All @@ -34,8 +56,9 @@ const printMsg = (m) => {
`msg seq: ${m.seq}, subject: ${m.subject}, timestamp: ${ts}, streamSequence: ${m.info.streamSequence}, deliverySequence: ${m.info.deliverySequence}`
);
// illustrate (one way of) grabbing message content as json
const data = m.json();
console.log(JSON.stringify(data, null, 2));
const jsonCodec = JSONCodec();
const data = jsonCodec.decode(m.data);
console.log(data);
try {
if (
data &&
Expand Down Expand Up @@ -67,7 +90,7 @@ const init = async () => {
// no credentials provided.
// anonymous connections have read access to the stream
console.log(`connect to nats server(s) ${servers} as 'anonymous'...`);
nc = await connect({
nc = await natsConnect({
servers: servers,
reconnectTimeWait: 10 * 1000, // 10s
});
Expand Down
4 changes: 3 additions & 1 deletion openshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,18 @@ oc create -n $NAMESPACE secret generic $APP_NAME-encryption-keys \
--from-literal=proxy=$proxy_key
```

We need to store a password for Event Stream Service client. Since the server(s) will change along with the password, we will store the server and credentials in a secret per environment (DEV, TEST, PROD). Pull requests can use the same as DEV.
We need to store a password for Event Stream Service client. Since the server(s) will change along with the password, we will store the server and credentials in a secret per environment (DEV, TEST, PROD) and whether we connect with WebSockets or NATS protocols Pull requests can use the same as DEV.

```sh

export ess_servers=<comma separated list of event stream servers>
export ess_websockets=<true/false - true if connection is made via websockets>
export ess_password=<chefs password from event stream service>

oc create -n $NAMESPACE secret generic $APP_NAME-event-stream-service \
--type=Opaque \
--from-literal=servers=$ess_servers \
--from-literal=websockets=$ess_websockets \
--from-literal=username=chefs \
--from-literal=password=$ess_password
```
Expand Down
5 changes: 5 additions & 0 deletions openshift/app.dc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ objects:
secretKeyRef:
key: servers
name: "${APP_NAME}-event-stream-service"
- name: EVENTSTREAMSERVICE_WEBSOCKETS
valueFrom:
secretKeyRef:
key: websockets
name: "${APP_NAME}-event-stream-service"
- name: EVENTSTREAMSERVICE_USERNAME
valueFrom:
secretKeyRef:
Expand Down

0 comments on commit fcc9885

Please sign in to comment.