diff --git a/packages/cactus-plugin-keychain-memory/package.json b/packages/cactus-plugin-keychain-memory/package.json index ac5c83ad03..559d346b29 100644 --- a/packages/cactus-plugin-keychain-memory/package.json +++ b/packages/cactus-plugin-keychain-memory/package.json @@ -64,6 +64,7 @@ "axios": "1.6.0", "express": "4.19.2", "prom-client": "13.2.0", + "rxjs": "7.8.1", "uuid": "9.0.1" }, "devDependencies": { diff --git a/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts b/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts index 75d2a23f86..7c0de0135f 100644 --- a/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts +++ b/packages/cactus-plugin-keychain-memory/src/main/typescript/plugin-keychain-memory.ts @@ -26,12 +26,15 @@ import { HasKeychainEntryV1Endpoint } from "./web-services/has-keychain-entry-en import { DefaultService } from "./generated/crpc/services/default_service_connect"; import { KeychainMemoryCrpcSvcOpenApi } from "./crpc-services/keychain-memory-crpc-svc-openapi"; import { ServiceType } from "@bufbuild/protobuf"; +import { Observable, ReplaySubject, Subject } from "rxjs"; export interface IPluginKeychainMemoryOptions extends ICactusPluginOptions { logLevel?: LogLevelDesc; backend?: Map; keychainId: string; prometheusExporter?: PrometheusExporter; + readonly observabilityBufferSize?: number; + readonly observabilityTtlSeconds?: number; } export class PluginKeychainMemory @@ -42,6 +45,22 @@ export class PluginKeychainMemory private readonly backend: Map; private readonly log: Logger; private readonly instanceId: string; + private readonly observabilityBufferSize: number; + private readonly observabilityTtlSeconds: number; + + private readonly getSubject: Subject<{ + readonly key: string; + readonly value: string; + }>; + private readonly setSubject: Subject<{ + readonly key: string; + readonly value: string; + }>; + private readonly hasSubject: Subject<{ + readonly key: string; + readonly isPresent: boolean; + }>; + private readonly deleteSubject: Subject<{ readonly key: string }>; private endpoints: IWebServiceEndpoint[] | undefined; public prometheusExporter: PrometheusExporter; @@ -56,6 +75,9 @@ export class PluginKeychainMemory Checks.truthy(opts.instanceId, `${fnTag} options.instanceId`); Checks.nonBlankString(opts.keychainId, `${fnTag} options.keychainId`); + this.observabilityBufferSize = opts.observabilityBufferSize || 1; + this.observabilityTtlSeconds = opts.observabilityTtlSeconds || 1; + this.backend = opts.backend || new Map(); Checks.truthy(this.backend, `${fnTag} arg options.backend`); @@ -63,6 +85,9 @@ export class PluginKeychainMemory const label = this.className; this.log = LoggerProvider.getOrCreate({ level, label }); + this.log.debug("observabilityBufferSize=%o", this.observabilityBufferSize); + this.log.debug("observabilityTtlSeconds=%o", this.observabilityTtlSeconds); + this.instanceId = this.opts.instanceId; this.prometheusExporter = opts.prometheusExporter || @@ -78,6 +103,23 @@ export class PluginKeychainMemory `Never use ${this.className} in production. ` + `It does not support encryption. It stores everything in plain text.`, ); + + this.getSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.setSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.hasSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); + this.deleteSubject = new ReplaySubject( + this.observabilityBufferSize, + this.observabilityTtlSeconds, + ); } public getOpenApiSpec(): unknown { @@ -181,23 +223,52 @@ export class PluginKeychainMemory async get(key: string): Promise { const value = this.backend.get(key); if (value) { + this.getSubject.next({ key, value }); return value; } else { throw new Error(`Keychain entry for "${key}" not found.`); } } + public observeGet(): Observable<{ readonly key: string }> { + return this.getSubject.asObservable(); + } + async has(key: string): Promise { - return this.backend.has(key); + const isPresent = this.backend.has(key); + this.hasSubject.next({ key, isPresent }); + return isPresent; + } + + public observeHas(): Observable<{ + readonly key: string; + readonly isPresent: boolean; + }> { + return this.hasSubject.asObservable(); } async set(key: string, value: string): Promise { this.backend.set(key, value); + this.setSubject.next({ key, value }); this.prometheusExporter.setTotalKeyCounter(this.backend.size); } + public observeSet(): Observable<{ + readonly key: string; + readonly value: string; + }> { + return this.setSubject.asObservable(); + } + async delete(key: string): Promise { this.backend.delete(key); + this.deleteSubject.next({ key }); this.prometheusExporter.setTotalKeyCounter(this.backend.size); } + + public observeDelete(): Observable<{ + readonly key: string; + }> { + return this.deleteSubject.asObservable(); + } } diff --git a/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts b/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts new file mode 100644 index 0000000000..cb608d540d --- /dev/null +++ b/packages/cactus-plugin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts @@ -0,0 +1,84 @@ +import "jest-extended"; +import { v4 as uuidV4 } from "uuid"; + +import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; + +import { PluginKeychainMemory } from "../../../main/typescript/public-api"; + +const logLevel: LogLevelDesc = "INFO"; + +describe("PluginKeychainMemory", () => { + const log = LoggerProvider.getOrCreate({ + label: "plugin-keychain-memory-observability.test.ts", + level: logLevel, + }); + + test("can observe set operations", async () => { + const keychain = new PluginKeychainMemory({ + instanceId: uuidV4(), + keychainId: uuidV4(), + logLevel, + }); + + let getCount = 0; + const stratedAt = new Date(); + + const taskPromise = new Promise((resolve) => { + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET: startedAt=%o value=%o", stratedAt, value); + if (getCount >= 5) { + resolve(); + } + }, + }); + + keychain.set("some-key-that-does-not-matter-1", uuidV4()); + keychain.set("some-key-that-does-not-matter-2", uuidV4()); + keychain.set("some-key-that-does-not-matter-3", uuidV4()); + keychain.set("some-key-that-does-not-matter-4", uuidV4()); + keychain.set("some-key-that-does-not-matter-5", uuidV4()); + }); + await expect(taskPromise).toResolve(); + }, 500); + + test("can observe set operations with buffer", async () => { + const keychain = new PluginKeychainMemory({ + instanceId: uuidV4(), + keychainId: uuidV4(), + logLevel, + observabilityBufferSize: 5, + observabilityTtlSeconds: 1000, + }); + + let getCount = 0; + const stratedAt = new Date(); + + keychain.set("some-key-that-does-not-matter-1", uuidV4()); + keychain.set("some-key-that-does-not-matter-2", uuidV4()); + keychain.set("some-key-that-does-not-matter-3", uuidV4()); + keychain.set("some-key-that-does-not-matter-4", uuidV4()); + keychain.set("some-key-that-does-not-matter-5", uuidV4()); + + const taskPromise = new Promise((resolve) => { + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET_1: startedAt=%o value=%o", stratedAt, value); + }, + }); + keychain.observeSet().subscribe({ + next: (value) => { + getCount++; + log.debug("NEXT_SET_2: startedAt=%o value=%o", stratedAt, value); + if (getCount >= 10) { + resolve(); + } + }, + }); + }); + + await expect(taskPromise).toResolve(); + }, 500); +}); diff --git a/yarn.lock b/yarn.lock index 43d32123fc..ab3a56345e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8457,6 +8457,7 @@ __metadata: express: "npm:4.19.2" npm-run-all2: "npm:6.1.2" prom-client: "npm:13.2.0" + rxjs: "npm:7.8.1" uuid: "npm:9.0.1" languageName: unknown linkType: soft