From 9b41377c3885cf12be3c0f49bd2745200b0d07d3 Mon Sep 17 00:00:00 2001 From: Peter Somogyvari Date: Thu, 4 Apr 2024 23:47:31 -0700 Subject: [PATCH] feat(plugin-keychain-memory): add observability via RxJS ReplaySubjects 1. This is an example of how to add observability to a plugin such as if you had to somehow expose the stream of transaction execution requests flowing through a connector plugin but did not feel like setting up Kafka or RabbitMQ just for this and instead opted to do it with an in-process, purely NodeJS/Javascript based solution. 2. The downside of this is of course that this doesn't work well in a distributed computing environment just by itself, since if you were to host a fleet of servers running the same connector plugin with horizontal scaling, then this wouldn't be able to observe all the invocations across the server fleet, but it would still make it easier to implement a functionality like that. 3. The main purpose of this pull request is educational. The keychain memory plugin is only used for testing and demonstration purposes and I wanted to show to a few other contributors what I meant when I was explaining that they could just use RxJS subjects to allow consumers of the connector plugins to observe the stream of transactions flowing through said connector plugin instance. Signed-off-by: Peter Somogyvari --- .../package.json | 1 + .../main/typescript/plugin-keychain-memory.ts | 73 +++++++++++++++- ...ugin-keychain-memory-observability.test.ts | 84 +++++++++++++++++++ yarn.lock | 1 + 4 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts diff --git a/packages/cactus-plugin-keychain-memory/package.json b/packages/cactus-plugin-keychain-memory/package.json index ac5c83ad03..559d346b29 100644 --- a/packages/cactus-plugin-keychain-memory/package.json +++ b/packages/cactus-plugin-keychain-memory/package.json @@ -64,6 +64,7 @@ "axios": "1.6.0", "express": "4.19.2", "prom-client": "13.2.0", + "rxjs": "7.8.1", "uuid": "9.0.1" }, "devDependencies": { diff --git a/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts b/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts index 75d2a23f86..7c0de0135f 100644 --- a/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts +++ b/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts @@ -26,12 +26,15 @@ import { HasKeychainEntryV1Endpoint } from "./web-services/has-keychain-entry-en import { DefaultService } from "./generated/crpc/services/default_service_connect"; import { KeychainMemoryCrpcSvcOpenApi } from "./crpc-services/keychain-memory-crpc-svc-openapi"; import { ServiceType } from "@bufbuild/protobuf"; +import { Observable, ReplaySubject, Subject } from "rxjs"; export interface IPluginKeychainMemoryOptions extends ICactusPluginOptions { logLevel?: LogLevelDesc; backend?: Map; keychainId: string; prometheusExporter?: PrometheusExporter; + readonly observabilityBufferSize?: number; + readonly observabilityTtlSeconds?: number; } export class PluginKeychainMemory @@ -42,6 +45,22 @@ export class PluginKeychainMemory private readonly backend: Map; private readonly log: Logger; private readonly instanceId: string; + private readonly observabilityBufferSize: number; + private readonly observabilityTtlSeconds: number; + + private readonly getSubject: Subject<{ + readonly key: string; + readonly value: string; + }>; + private readonly setSubject: Subject<{ + readonly key: string; + readonly value: string; + }>; + private readonly hasSubject: Subject<{ + readonly key: string; + readonly isPresent: boolean; + }>; + private readonly deleteSubject: Subject<{ readonly key: string }>; private endpoints: IWebServiceEndpoint[] | undefined; public prometheusExporter: PrometheusExporter; @@ -56,6 +75,9 @@ export class PluginKeychainMemory Checks.truthy(opts.instanceId, `${fnTag} options.instanceId`); Checks.nonBlankString(opts.keychainId, `${fnTag} options.keychainId`); + this.observabilityBufferSize = opts.observabilityBufferSize || 1; + this.observabilityTtlSeconds = opts.observabilityTtlSeconds || 1; + this.backend = opts.backend || new Map(); Checks.truthy(this.backend, `${fnTag} arg options.backend`); @@ -63,6 +85,9 @@ export class PluginKeychainMemory const label = this.className; this.log = LoggerProvider.getOrCreate({ level, label }); + this.log.debug("observabilityBufferSize=%o", this.observabilityBufferSize); + this.log.debug("observabilityTtlSeconds=%o", this.observabilityTtlSeconds); + this.instanceId = this.opts.instanceId; this.prometheusExporter = opts.prometheusExporter || @@ -78,6 +103,23 @@ export class PluginKeychainMemory `Never use ${this.className} in production. ` + `It does not support encryption. It stores everything in plain text.`, ); + + this.getSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.setSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.hasSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.deleteSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); } public getOpenApiSpec(): unknown { @@ -181,23 +223,52 @@ export class PluginKeychainMemory async get(key: string): Promise { const value = this.backend.get(key); if (value) { + this.getSubject.next({ key, value }); return value; } else { throw new Error(`Keychain entry for "${key}" not found.`); } } + public observeGet(): Observable<{ readonly key: string }> { + return this.getSubject.asObservable(); + } + async has(key: string): Promise { - return this.backend.has(key); + const isPresent = this.backend.has(key); + this.hasSubject.next({ key, isPresent }); + return isPresent; + } + + public observeHas(): Observable<{ + readonly key: string; + readonly isPresent: boolean; + }> { + return this.hasSubject.asObservable(); } async set(key: string, value: string): Promise { this.backend.set(key, value); + this.setSubject.next({ key, value }); this.prometheusExporter.setTotalKeyCounter(this.backend.size); } + public observeSet(): Observable<{ + readonly key: string; + readonly value: string; + }> { + return this.setSubject.asObservable(); + } + async delete(key: string): Promise { this.backend.delete(key); + this.deleteSubject.next({ key }); this.prometheusExporter.setTotalKeyCounter(this.backend.size); } + + public observeDelete(): Observable<{ + readonly key: string; + }> { + return this.deleteSubject.asObservable(); + } } diff --git a/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts b/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts new file mode 100644 index 0000000000..cb608d540d --- /dev/null +++ b/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts @@ -0,0 +1,84 @@ +import "jest-extended"; +import { v4 as uuidV4 } from "uuid"; + +import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; + +import { PluginKeychainMemory } from "../../../main/typescript/public-api"; + +const logLevel: LogLevelDesc = "INFO"; + +describe("PluginKeychainMemory", () => { + const log = LoggerProvider.getOrCreate({ + label: "plugin-keychain-memory-observability.test.ts", + level: logLevel, + }); + + test("can observe set operations", async () => { + const keychain = new PluginKeychainMemory({ + instanceId: uuidV4(), + keychainId: uuidV4(), + logLevel, + }); + + let getCount = 0; + const stratedAt = new Date(); + + const taskPromise = new Promise((resolve) => { + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET: startedAt=%o value=%o", stratedAt, value); + if (getCount >= 5) { + resolve(); + } + }, + }); + + keychain.set("some-key-that-does-not-matter-1", uuidV4()); + keychain.set("some-key-that-does-not-matter-2", uuidV4()); + keychain.set("some-key-that-does-not-matter-3", uuidV4()); + keychain.set("some-key-that-does-not-matter-4", uuidV4()); + keychain.set("some-key-that-does-not-matter-5", uuidV4()); + }); + await expect(taskPromise).toResolve(); + }, 500); + + test("can observe set operations with buffer", async () => { + const keychain = new PluginKeychainMemory({ + instanceId: uuidV4(), + keychainId: uuidV4(), + logLevel, + observabilityBufferSize: 5, + observabilityTtlSeconds: 1000, + }); + + let getCount = 0; + const stratedAt = new Date(); + + keychain.set("some-key-that-does-not-matter-1", uuidV4()); + keychain.set("some-key-that-does-not-matter-2", uuidV4()); + keychain.set("some-key-that-does-not-matter-3", uuidV4()); + keychain.set("some-key-that-does-not-matter-4", uuidV4()); + keychain.set("some-key-that-does-not-matter-5", uuidV4()); + + const taskPromise = new Promise((resolve) => { + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET_1: startedAt=%o value=%o", stratedAt, value); + }, + }); + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET_2: startedAt=%o value=%o", stratedAt, value); + if (getCount >= 10) { + resolve(); + } + }, + }); + }); + + await expect(taskPromise).toResolve(); + }, 500); +}); diff --git a/yarn.lock b/yarn.lock index 43d32123fc..ab3a56345e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8457,6 +8457,7 @@ __metadata: express: "npm:4.19.2" npm-run-all2: "npm:6.1.2" prom-client: "npm:13.2.0" + rxjs: "npm:7.8.1" uuid: "npm:9.0.1" languageName: unknown linkType: soft