From b0d426c37c60c677ff1011f5a6da3814fcbe66da Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Wed, 7 May 2025 11:00:33 -0500 Subject: [PATCH 1/4] chore: adds LDTransactionalFeatureStore, LDTransactionalDataSourceUpdates, and FDv2 DataSource impls. --- .../data_sources/DataSourceUpdates.test.ts | 19 +- .../OneShotInitializerFDv2.test.ts | 77 ++++++ .../data_sources/PollingProcessor.test.ts | 33 +-- .../data_sources/PollingProcessorFDv2.test.ts | 233 ++++++++++++++++++ .../createPayloadListenersFDv2.test.ts | 5 +- .../store/TransactionalFeatureStore.test.ts | 122 +++++++++ .../shared/sdk-server/src/LDClientImpl.ts | 3 +- packages/shared/sdk-server/src/api/index.ts | 1 + .../LDTransactionalDataSourceUpdates.ts | 69 ++++++ .../subsystems/LDTransactionalFeatureStore.ts | 160 ++++++++++++ .../sdk-server/src/api/subsystems/index.ts | 5 +- .../data_sources/OneShotInitializerFDv2.ts | 100 ++++++++ .../src/data_sources/PollingProcessor.ts | 13 +- .../src/data_sources/PollingProcessorFDv2.ts | 155 ++++++++++++ .../TransactionalDataSourceUpdates.ts | 193 +++++++++++++++ .../store/AsyncTransactionalStoreFacade.ts | 82 ++++++ .../src/store/InMemoryFeatureStore.ts | 91 +++++-- .../src/store/TransactionalFeatureStore.ts | 114 +++++++++ packages/shared/sdk-server/src/store/index.ts | 10 +- 19 files changed, 1413 insertions(+), 72 deletions(-) create mode 100644 packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts create mode 100644 packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts create mode 100644 packages/shared/sdk-server/__tests__/store/TransactionalFeatureStore.test.ts create mode 100644 packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts create mode 100644 packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts create mode 100644 packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts create mode 100644 packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts create mode 100644 packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts create mode 100644 packages/shared/sdk-server/src/store/AsyncTransactionalStoreFacade.ts create mode 100644 packages/shared/sdk-server/src/store/TransactionalFeatureStore.ts diff --git a/packages/shared/sdk-server/__tests__/data_sources/DataSourceUpdates.test.ts b/packages/shared/sdk-server/__tests__/data_sources/DataSourceUpdates.test.ts index 401589febd..414d339448 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/DataSourceUpdates.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/DataSourceUpdates.test.ts @@ -2,9 +2,9 @@ import { AsyncQueue } from 'launchdarkly-js-test-helpers'; import { internal } from '@launchdarkly/js-sdk-common'; -import { LDFeatureStore } from '../../src/api/subsystems'; +import { LDTransactionalFeatureStore } from '../../src/api/subsystems'; import promisify from '../../src/async/promisify'; -import DataSourceUpdates from '../../src/data_sources/DataSourceUpdates'; +import DataSourceUpdates from '../../src/data_sources/TransactionalDataSourceUpdates'; import InMemoryFeatureStore from '../../src/store/InMemoryFeatureStore'; import VersionedDataKinds from '../../src/store/VersionedDataKinds'; @@ -13,21 +13,28 @@ type InitMetadata = internal.InitMetadata; it('passes initialization metadata to underlying feature store', () => { const metadata: InitMetadata = { environmentId: '12345' }; const store = new InMemoryFeatureStore(); - store.init = jest.fn(); + store.applyChanges = jest.fn(); const updates = new DataSourceUpdates( store, () => false, () => {}, ); updates.init({}, () => {}, metadata); - expect(store.init).toHaveBeenCalledTimes(1); - expect(store.init).toHaveBeenNthCalledWith(1, expect.any(Object), expect.any(Function), metadata); + expect(store.applyChanges).toHaveBeenCalledTimes(1); + expect(store.applyChanges).toHaveBeenNthCalledWith( + 1, + true, + expect.any(Object), + expect.any(Function), + metadata, + undefined, + ); }); describe.each([true, false])( 'given a DataSourceUpdates with in memory store and change listeners: %s', (listen) => { - let store: LDFeatureStore; + let store: LDTransactionalFeatureStore; let updates: DataSourceUpdates; const queue = new AsyncQueue(); diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts new file mode 100644 index 0000000000..9bfa3fa868 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -0,0 +1,77 @@ +import { subsystem } from '../../src'; +import OneShotInitializerFDv2 from '../../src/data_sources/OneShotInitializerFDv2'; +import Requestor from '../../src/data_sources/Requestor'; +import TestLogger from '../Logger'; + +describe('given a one shot initializer', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let initializer: OneShotInitializerFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + let testLogger: TestLogger; + + beforeEach(() => { + testLogger = new TestLogger(); + initializer = new OneShotInitializerFDv2(requestor as unknown as Requestor, testLogger); + }); + + afterEach(() => { + initializer.stop(); + jest.restoreAllMocks(); + }); + + it('makes no requests before being started', () => { + expect(requestor.requestAllData).not.toHaveBeenCalled(); + }); + + it('polls immediately on start', () => { + initializer.start(mockDataCallback, mockStatusCallback); + expect(requestor.requestAllData).toHaveBeenCalledTimes(1); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + }); + + it('calls callback on success', () => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + initializer.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }); + }); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts index 1e43688301..9d25a142b5 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessor.test.ts @@ -1,13 +1,9 @@ -import { ClientContext } from '@launchdarkly/js-sdk-common'; - import { LDFeatureStore } from '../../src'; import PollingProcessor from '../../src/data_sources/PollingProcessor'; import Requestor from '../../src/data_sources/Requestor'; -import Configuration from '../../src/options/Configuration'; import AsyncStoreFacade from '../../src/store/AsyncStoreFacade'; import InMemoryFeatureStore from '../../src/store/InMemoryFeatureStore'; import VersionedDataKinds from '../../src/store/VersionedDataKinds'; -import { createBasicPlatform } from '../createBasicPlatform'; import TestLogger, { LogLevel } from '../Logger'; describe('given an event processor', () => { @@ -23,24 +19,19 @@ describe('given an event processor', () => { let store: LDFeatureStore; let storeFacade: AsyncStoreFacade; - let config: Configuration; let processor: PollingProcessor; let initSuccessHandler: jest.Mock; beforeEach(() => { store = new InMemoryFeatureStore(); storeFacade = new AsyncStoreFacade(store); - config = new Configuration({ - featureStore: store, - pollInterval: longInterval, - logger: new TestLogger(), - }); initSuccessHandler = jest.fn(); processor = new PollingProcessor( - config, requestor as unknown as Requestor, - config.featureStoreFactory(new ClientContext('', config, createBasicPlatform())), + longInterval, + store, + new TestLogger(), initSuccessHandler, ); }); @@ -99,27 +90,22 @@ describe('given a polling processor with a short poll duration', () => { const jsonData = JSON.stringify(allData); let store: LDFeatureStore; - let config: Configuration; + let testLogger: TestLogger; let processor: PollingProcessor; let initSuccessHandler: jest.Mock; let errorHandler: jest.Mock; beforeEach(() => { store = new InMemoryFeatureStore(); - config = new Configuration({ - featureStore: store, - pollInterval: shortInterval, - logger: new TestLogger(), - }); + testLogger = new TestLogger(); initSuccessHandler = jest.fn(); errorHandler = jest.fn(); - // Configuration will not let us set this as low as needed for the test. - Object.defineProperty(config, 'pollInterval', { value: 0.1 }); processor = new PollingProcessor( - config, requestor as unknown as Requestor, - config.featureStoreFactory(new ClientContext('', config, createBasicPlatform())), + shortInterval, + store, + testLogger, initSuccessHandler, errorHandler, ); @@ -158,7 +144,6 @@ describe('given a polling processor with a short poll duration', () => { expect(errorHandler).not.toBeCalled(); setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBe(0); expect(testLogger.getCount(LogLevel.Warn)).toBeGreaterThan(2); (done as jest.DoneCallback)(); @@ -176,7 +161,6 @@ describe('given a polling processor with a short poll duration', () => { setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBeGreaterThan(2); (done as jest.DoneCallback)(); }, 300); @@ -199,7 +183,6 @@ describe('given a polling processor with a short poll duration', () => { setTimeout(() => { expect(requestor.requestAllData.mock.calls.length).toBe(1); - const testLogger = config.logger as TestLogger; expect(testLogger.getCount(LogLevel.Error)).toBe(1); (done as jest.DoneCallback)(); }, 300); diff --git a/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts new file mode 100644 index 0000000000..3cadd04e28 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/data_sources/PollingProcessorFDv2.test.ts @@ -0,0 +1,233 @@ +import { DataSourceErrorKind, LDPollingError, subsystem } from '../../src'; +import PollingProcessorFDv2 from '../../src/data_sources/PollingProcessorFDv2'; +import Requestor from '../../src/data_sources/Requestor'; +import TestLogger, { LogLevel } from '../Logger'; + +describe('given an event processor', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const longInterval = 100000; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let processor: PollingProcessorFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + + beforeEach(() => { + processor = new PollingProcessorFDv2( + requestor as unknown as Requestor, + longInterval, + new TestLogger(), + ); + }); + + afterEach(() => { + processor.stop(); + jest.restoreAllMocks(); + }); + + it('makes no requests before being started', () => { + expect(requestor.requestAllData).not.toHaveBeenCalled(); + }); + + it('polls immediately on start', () => { + processor.start(mockDataCallback, mockStatusCallback); + expect(requestor.requestAllData).toHaveBeenCalledTimes(1); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + }); + + it('calls callback on success', async () => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + processor.start(dataCallback, mockStatusCallback); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }); + }); +}); + +describe('given a polling processor with a short poll duration', () => { + const requestor = { + requestAllData: jest.fn(), + }; + const shortInterval = 0.1; + const allEvents = { + events: [ + { + event: 'server-intent', + data: { payloads: [{ code: 'xfer-full', id: 'mockId' }] }, + }, + { + event: 'put-object', + data: { + kind: 'flag', + key: 'flagA', + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + }, + { + event: 'payload-transferred', + data: { state: 'mockState', version: 1 }, + }, + ], + }; + const jsonData = JSON.stringify(allEvents); + + let testLogger: TestLogger; + let processor: PollingProcessorFDv2; + const mockDataCallback = jest.fn(); + const mockStatusCallback = jest.fn(); + + beforeEach(() => { + testLogger = new TestLogger(); + + processor = new PollingProcessorFDv2( + requestor as unknown as Requestor, + shortInterval, + testLogger, + ); + }); + + afterEach(() => { + processor.stop(); + jest.resetAllMocks(); + }); + + it('polls repeatedly', (done) => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); + + processor.start(mockDataCallback, mockStatusCallback); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(4); + done(); + }, 500); + }); + + it.each([400, 408, 429, 500, 503])( + 'continues polling after recoverable error', + (status, done) => { + requestor.requestAllData = jest.fn((cb) => + cb( + { + status, + }, + undefined, + ), + ); + + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + `Received error ${status} for polling request - will retry`, + status as number, + ), + ); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(testLogger.getCount(LogLevel.Error)).toBe(0); + expect(testLogger.getCount(LogLevel.Warn)).toBeGreaterThan(2); + (done as jest.DoneCallback)(); + }, 300); + }, + ); + + it('continues polling after receiving invalid JSON', (done) => { + requestor.requestAllData = jest.fn((cb) => cb(undefined, '{sad')); + + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Interrupted, + new LDPollingError(DataSourceErrorKind.ErrorResponse, `Malformed data in polling response`), + ); + + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(testLogger.getCount(LogLevel.Error)).toBeGreaterThan(2); + (done as jest.DoneCallback)(); + }, 300); + }); + + it.each([401, 403])( + 'does not continue after non-recoverable error', + (status, done) => { + requestor.requestAllData = jest.fn((cb) => + cb( + { + status, + }, + undefined, + ), + ); + processor.start(mockDataCallback, mockStatusCallback); + expect(mockDataCallback).not.toHaveBeenCalled(); + expect(mockStatusCallback).toHaveBeenNthCalledWith(1, subsystem.DataSourceState.Initializing); + expect(mockStatusCallback).toHaveBeenNthCalledWith( + 2, + subsystem.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + status === 401 + ? `Received error ${status} (invalid SDK key) for polling request - giving up permanently` + : `Received error ${status} for polling request - giving up permanently`, + status as number, + false, + ), + ); + setTimeout(() => { + expect(requestor.requestAllData.mock.calls.length).toBe(1); + expect(testLogger.getCount(LogLevel.Error)).toBe(1); + (done as jest.DoneCallback)(); + }, 300); + }, + ); +}); diff --git a/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts index 967eb658f7..adffa94635 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/createPayloadListenersFDv2.test.ts @@ -1,6 +1,6 @@ import { LDLogger } from '@launchdarkly/js-sdk-common'; -import { LDDataSourceUpdates } from '../../src/api/subsystems'; +import { LDTransactionalDataSourceUpdates } from '../../src/api/subsystems'; import { createPayloadListener } from '../../src/data_sources/createPayloadListenerFDv2'; jest.mock('../../src/store/serialization'); @@ -81,13 +81,14 @@ const changesTransferPayload = { }; describe('createPayloadListenerFDv2', () => { - let dataSourceUpdates: LDDataSourceUpdates; + let dataSourceUpdates: LDTransactionalDataSourceUpdates; let basisRecieved: jest.Mock; beforeEach(() => { dataSourceUpdates = { init: jest.fn(), upsert: jest.fn(), + applyChanges: jest.fn(), }; basisRecieved = jest.fn(); }); diff --git a/packages/shared/sdk-server/__tests__/store/TransactionalFeatureStore.test.ts b/packages/shared/sdk-server/__tests__/store/TransactionalFeatureStore.test.ts new file mode 100644 index 0000000000..b853d6ef41 --- /dev/null +++ b/packages/shared/sdk-server/__tests__/store/TransactionalFeatureStore.test.ts @@ -0,0 +1,122 @@ +import { LDTransactionalFeatureStore } from '../../src/api/subsystems'; +import AsyncTransactionalStoreFacade from '../../src/store/AsyncTransactionalStoreFacade'; +import InMemoryFeatureStore from '../../src/store/InMemoryFeatureStore'; +import TransactionalFeatureStore from '../../src/store/TransactionalFeatureStore'; +import VersionedDataKinds from '../../src/store/VersionedDataKinds'; + +describe('given a non transactional store', () => { + let mockNontransactionalStore: LDTransactionalFeatureStore; + let transactionalStore: TransactionalFeatureStore; + + let nonTransactionalFacade: AsyncTransactionalStoreFacade; + let transactionalFacade: AsyncTransactionalStoreFacade; + + beforeEach(() => { + mockNontransactionalStore = new InMemoryFeatureStore(); + transactionalStore = new TransactionalFeatureStore(mockNontransactionalStore); + + // these two facades are used to make test writing easier + nonTransactionalFacade = new AsyncTransactionalStoreFacade(mockNontransactionalStore); + transactionalFacade = new AsyncTransactionalStoreFacade(transactionalStore); + }); + + afterEach(() => { + transactionalFacade.close(); + jest.restoreAllMocks(); + }); + + it('applies changes to non transactional store', async () => { + await transactionalFacade.applyChanges( + false, + { + features: { + key1: { + version: 2, + }, + }, + }, + undefined, + 'selector1', + ); + expect(await nonTransactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + key: 'key1', + version: 2, + }, + }); + expect(await transactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + key: 'key1', + version: 2, + }, + }); + }); + + it('it reads through to non transactional store before basis is provided', async () => { + await nonTransactionalFacade.init({ + features: { + key1: { + version: 1, + }, + }, + }); + expect(await transactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + version: 1, + }, + }); + }); + + it('it switches to memory store when basis is provided', async () => { + // situate some mock data in non transactional store + await nonTransactionalFacade.init({ + features: { + nontransactionalFeature: { + version: 1, + }, + }, + }); + + await transactionalFacade.applyChanges( + true, + { + features: { + key1: { + version: 1, + }, + }, + }, + undefined, + 'selector1', + ); + + expect(await nonTransactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + version: 1, + }, + }); + + expect(await transactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + version: 1, + }, + }); + + // corrupt non transactional store and then read from transactional store to prove it is not + // using underlying non transactional store for reads + await nonTransactionalFacade.init({ + features: { + nontransactionalFeature: { + version: 1, + }, + }, + }); + + // still should read from memory + expect(await transactionalFacade.all(VersionedDataKinds.Features)).toEqual({ + key1: { + version: 1, + }, + }); + }); +}); diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 0b1d79901e..6d9096b54c 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -232,9 +232,10 @@ export default class LDClientImpl implements LDClient { this._config.streamInitialReconnectDelay, ) : new PollingProcessor( - config, new Requestor(config, this._platform.requests, baseHeaders), + config.pollInterval, dataSourceUpdates, + config.logger, () => this._initSuccess(), (e) => this._dataSourceErrorHandler(e), ); diff --git a/packages/shared/sdk-server/src/api/index.ts b/packages/shared/sdk-server/src/api/index.ts index 70069cc32f..2e656ad5a9 100644 --- a/packages/shared/sdk-server/src/api/index.ts +++ b/packages/shared/sdk-server/src/api/index.ts @@ -4,6 +4,7 @@ export * from './LDClient'; export * from './LDMigration'; export * from './interfaces/DataKind'; export * from './subsystems/LDFeatureStore'; +export * from './subsystems/LDTransactionalFeatureStore'; export * from './LDWaitForInitializationOptions'; // These are items that should be less frequently used, and therefore they diff --git a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts new file mode 100644 index 0000000000..a60d878610 --- /dev/null +++ b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts @@ -0,0 +1,69 @@ +import { internal } from '@launchdarkly/js-sdk-common'; + +import { DataKind } from '../interfaces'; +import { LDFeatureStoreDataStorage, LDKeyedFeatureStoreItem } from './LDFeatureStore'; + +type InitMetadata = internal.InitMetadata; + +/** + * Interface that a data source implementation will use to push data into the SDK. + * + * The data source interacts with this object, rather than manipulating the data store directly, so + * that the SDK can perform any other necessary operations that must happen when data is updated. + */ +export interface LDTransactionalDataSourceUpdates { + /** + * Completely overwrites the current contents of the data store with a set of items for each + * collection. + * + * @param allData + * An object in which each key is the "namespace" of a collection (e.g. `"features"`) and + * the value is an object that maps keys to entities. The actual type of this parameter is + * `interfaces.FullDataSet`. + * + * @param callback + * Will be called when the store has been initialized. + * + * @param initMetadata + * Optional metadata to initialize the data source with. + */ + init(allData: LDFeatureStoreDataStorage, callback: () => void, initMetadata?: InitMetadata): void; + + /** + * Updates or inserts an item in the specified collection. For updates, the object will only be + * updated if the existing version is less than the new version. + * + * @param kind + * The type of data to be accessed. The actual type of this parameter is + * {@link interfaces.DataKind}. + * + * @param data + * The contents of the entity, as an object that can be converted to JSON. The store + * should check the `version` property of this object, and should *not* overwrite any + * existing data if the existing `version` is greater than or equal to that value. + * The actual type of this parameter is {@link interfaces.VersionedData}. + * + * @param callback + * Will be called after the upsert operation is complete. + */ + upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void; + + /** + * @param basis If true, completely overwrites the current contents of the data store + * with the provided data. If false, upserts the items in the provided data. Upserts + * are made only if provided items have newer versions than existing items. + * @param data An object in which each key is the "namespace" of a collection (e.g. `"features"`) and + * the value is an object that maps keys to entities. The actual type of this parameter is + * `interfaces.FullDataSet`. + * @param callback Will be called after the changes are applied. + * @param initMetadata Optional metadata to initialize the data source with. + * @param selector opaque string that uniquely identifies the state that contains the changes + */ + applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + callback: () => void, + initMetadata?: InitMetadata, + selector?: String, + ): void; +} diff --git a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts new file mode 100644 index 0000000000..fd3542c8d8 --- /dev/null +++ b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts @@ -0,0 +1,160 @@ +import { internal } from '@launchdarkly/js-sdk-common'; + +import { DataKind } from '../interfaces'; +import { + LDFeatureStoreDataStorage, + LDFeatureStoreItem, + LDFeatureStoreKindData, + LDKeyedFeatureStoreItem, +} from './LDFeatureStore'; + +type InitMetadata = internal.InitMetadata; + +/** + * Transactional version of {@link LDFeatureStore} with support for {@link applyChanges} + */ +export interface LDTransactionalFeatureStore { + /** + * Get an entity from the store. + * + * The store should treat any entity with the property `deleted: true` as "not found". + * + * @param kind + * The type of data to be accessed. The store should not make any assumptions about the format + * of the data, but just return a JSON object. The actual type of this parameter is + * {@link interfaces.DataKind}. + * + * @param key + * The unique key of the entity within the specified collection. + * + * @param callback + * Will be called with the retrieved entity, or null if not found. The actual type of the result + * value is {@link interfaces.VersionedData}. + */ + get(kind: DataKind, key: string, callback: (res: LDFeatureStoreItem | null) => void): void; + + /** + * Get all entities from a collection. + * + * The store should filter out any entities with the property `deleted: true`. + * + * @param kind + * The type of data to be accessed. The store should not make any assumptions about the format + * of the data, but just return an object in which each key is the `key` property of an entity + * and the value is the entity. The actual type of this parameter is + * {@link interfaces.DataKind}. + * + * @param callback + * Will be called with the resulting map. The actual type of the result value is + * `interfaces.KeyedItems`. + */ + all(kind: DataKind, callback: (res: LDFeatureStoreKindData) => void): void; + + /** + * Initialize the store, overwriting any existing data. + * + * @param allData + * An object in which each key is the "namespace" of a collection (e.g. `"features"`) and + * the value is an object that maps keys to entities. The actual type of this parameter is + * `interfaces.FullDataSet`. + * + * @param callback + * Will be called when the store has been initialized. + * + * @param initMetadata + * Optional metadata to initialize the feature store with. + */ + init(allData: LDFeatureStoreDataStorage, callback: () => void, initMetadata?: InitMetadata): void; + + /** + * Delete an entity from the store. + * + * Deletion should be implemented by storing a placeholder object with the property + * `deleted: true` and a `version` property equal to the provided version. In other words, + * it should be exactly the same as calling `upsert` with such an object. + * + * @param kind + * The type of data to be accessed. The actual type of this parameter is + * {@link interfaces.DataKind}. + * + * @param key + * The unique key of the entity within the specified collection. + * + * @param version + * A number that must be greater than the `version` property of the existing entity in + * order for it to be deleted. If it is less than or equal to the existing version, the + * method should do nothing. + * + * @param callback + * Will be called when the delete operation is complete. + */ + delete(kind: DataKind, key: string, version: number, callback: () => void): void; + + /** + * Add an entity or update an existing entity. + * + * @param kind + * The type of data to be accessed. The actual type of this parameter is + * {@link interfaces.DataKind}. + * + * @param data + * The contents of the entity, as an object that can be converted to JSON. The store + * should check the `version` property of this object, and should *not* overwrite any + * existing data if the existing `version` is greater than or equal to that value. + * The actual type of this parameter is {@link interfaces.VersionedData}. + * + * @param callback + * Will be called after the upsert operation is complete. + */ + upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void; + + /** + * Applies the provided data onto the existing data, replacing all data or upserting depending + * on the basis parameter. Must call {@link applyChanges} providing basis before calling {@link applyChanges} + * that is not a basis. + * + * @param basis If true, completely overwrites the current contents of the data store + * with the provided data. If false, upserts the items in the provided data. Upserts + * are made only if provided items have newer versions than existing items. + * @param data An object in which each key is the "namespace" of a collection (e.g. `"features"`) and + * the value is an object that maps keys to entities. The actual type of this parameter is + * `interfaces.FullDataSet`. + * @param callback Will be called after the changes are applied. + * @param initMetadata Optional metadata to initialize the feature store with. + * @param selector opaque string that uniquely identifies the state that contains the changes + */ + applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + callback: () => void, + initMetadata?: internal.InitMetadata, + selector?: String, + ): void; + + /** + * Tests whether the store is initialized. + * + * "Initialized" means that the store has been populated with data, either by the client + * having called `init()` within this process, or by another process (if this is a shared + * database). + * + * @param callback + * Will be called back with the boolean result. + */ + initialized(callback: (isInitialized: boolean) => void): void; + + /** + * Releases any resources being used by the feature store. + */ + close(): void; + + /** + * Get a description of the store. + */ + getDescription?(): string; + + /** + * Get the initialization metadata of the store. + */ + getInitMetaData?(): InitMetadata | undefined; +} diff --git a/packages/shared/sdk-server/src/api/subsystems/index.ts b/packages/shared/sdk-server/src/api/subsystems/index.ts index 4e21d27949..1c99b80b67 100644 --- a/packages/shared/sdk-server/src/api/subsystems/index.ts +++ b/packages/shared/sdk-server/src/api/subsystems/index.ts @@ -1,3 +1,6 @@ +export * from './LDDataSourceUpdates'; export * from './LDFeatureRequestor'; export * from './LDFeatureStore'; -export * from './LDDataSourceUpdates'; +export * from './LDTransactionalDataSourceUpdates'; +export * from './LDTransactionalFeatureStore'; + diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts new file mode 100644 index 0000000000..d511361228 --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -0,0 +1,100 @@ +import { + DataSourceErrorKind, + httpErrorMessage, + internal, + LDLogger, + LDPollingError, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import Requestor from './Requestor'; + +/** + * @internal + */ +export default class OneShotInitializerFDv2 implements subsystemCommon.DataSource { + constructor( + private readonly _requestor: Requestor, + private readonly _logger?: LDLogger, + ) {} + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + statusCallback(subsystemCommon.DataSourceState.Initializing); + + this._logger?.debug('Performing initialization request to LaunchDarkly for feature flag data.'); + this._requestor.requestAllData((err, body) => { + if (err) { + const { status } = err; + const message = httpErrorMessage(err, 'initializer', 'initializer does not retry'); + this._logger?.error(message); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + return; + } + + if (!body) { + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'One shot initializer response missing body.', + ), + ); + return; + } + + try { + const parsed = JSON.parse(body) as internal.FDv2EventsCollection; + const payloadProcessor = new internal.PayloadProcessor( + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + statusCallback(subsystemCommon.DataSourceState.Valid); + + payloadProcessor.addPayloadListener((payload) => { + dataCallback(payload.basis, payload); + }); + + payloadProcessor.processEvents(parsed.events); + + statusCallback(subsystemCommon.DataSourceState.Closed); + } catch (error: any) { + // We could not parse this JSON. Report the problem. + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), + ); + } + }); + } + + stop() { + // no-op since requestor has no cancellation support + } +} diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessor.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessor.ts index 07ef5bf9f0..2d0ff5230b 100644 --- a/packages/shared/sdk-server/src/data_sources/PollingProcessor.ts +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessor.ts @@ -10,7 +10,6 @@ import { } from '@launchdarkly/js-sdk-common'; import { LDDataSourceUpdates } from '../api/subsystems'; -import Configuration from '../options/Configuration'; import { deserializePoll } from '../store'; import VersionedDataKinds from '../store/VersionedDataKinds'; import Requestor from './Requestor'; @@ -25,22 +24,16 @@ const { initMetadataFromHeaders } = internal; export default class PollingProcessor implements subsystem.LDStreamProcessor { private _stopped = false; - private _logger?: LDLogger; - - private _pollInterval: number; - private _timeoutHandle: any; constructor( - config: Configuration, private readonly _requestor: Requestor, + private readonly _pollInterval: number, private readonly _featureStore: LDDataSourceUpdates, + private readonly _logger?: LDLogger, private readonly _initSuccessHandler: VoidFunction = () => {}, private readonly _errorHandler?: PollingErrorHandler, - ) { - this._logger = config.logger; - this._pollInterval = config.pollInterval; - } + ) {} private _poll() { if (this._stopped) { diff --git a/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts new file mode 100644 index 0000000000..dc0f9da02f --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/PollingProcessorFDv2.ts @@ -0,0 +1,155 @@ +import { + DataSourceErrorKind, + httpErrorMessage, + internal, + isHttpRecoverable, + LDLogger, + LDPollingError, + subsystem as subsystemCommon, +} from '@launchdarkly/js-sdk-common'; + +import { Flag } from '../evaluation/data/Flag'; +import { Segment } from '../evaluation/data/Segment'; +import { processFlag, processSegment } from '../store/serialization'; +import Requestor from './Requestor'; + +export type PollingErrorHandler = (err: LDPollingError) => void; + +/** + * @internal + */ +export default class PollingProcessorFDv2 implements subsystemCommon.DataSource { + private _stopped = false; + private _timeoutHandle: any; + + private _statusCallback?: (status: subsystemCommon.DataSourceState, err?: any) => void; + + constructor( + private readonly _requestor: Requestor, + private readonly _pollInterval: number = 30, + private readonly _logger?: LDLogger, + ) {} + + private _poll( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + if (this._stopped) { + return; + } + + const startTime = Date.now(); + this._logger?.debug('Polling LaunchDarkly for feature flag updates'); + this._requestor.requestAllData((err, body) => { + const elapsed = Date.now() - startTime; + const sleepFor = Math.max(this._pollInterval * 1000 - elapsed, 0); + + this._logger?.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor); + if (err) { + const { status } = err; + if (status && !isHttpRecoverable(status)) { + const message = httpErrorMessage(err, 'polling request'); + this._logger?.error(message); + statusCallback( + subsystemCommon.DataSourceState.Closed, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status, false), + ); + // It is not recoverable, return and do not trigger another poll. + return; + } + + const message = httpErrorMessage(err, 'polling request', 'will retry'); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(DataSourceErrorKind.ErrorResponse, message, status), + ); + this._logger?.warn(message); + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + return; + } + + if (!body) { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + 'Response missing body, will retry.', + ), + ); + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + return; + } + + try { + const parsed = JSON.parse(body) as internal.FDv2EventsCollection; + const payloadProcessor = new internal.PayloadProcessor( + { + flag: (flag: Flag) => { + processFlag(flag); + return flag; + }, + segment: (segment: Segment) => { + processSegment(segment); + return segment; + }, + }, + (errorKind: DataSourceErrorKind, message: string) => { + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(errorKind, message), + ); + }, + this._logger, + ); + + payloadProcessor.addPayloadListener((payload) => { + dataCallback(payload.basis, payload); + }); + + payloadProcessor.processEvents(parsed.events); + + // TODO: SDK-855 implement blocking duplicate data source state events in DataAvailability API + statusCallback(subsystemCommon.DataSourceState.Valid); + } catch { + // We could not parse this JSON. Report the problem and fallthrough to + // start another poll. + this._logger?.error('Response contained invalid data'); + this._logger?.debug(`${err} - Body follows: ${body}`); + statusCallback( + subsystemCommon.DataSourceState.Interrupted, + new LDPollingError(DataSourceErrorKind.InvalidData, 'Malformed data in polling response'), + ); + } + + // schedule poll + this._timeoutHandle = setTimeout(() => { + this._poll(dataCallback, statusCallback); + }, sleepFor); + }); + } + + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: subsystemCommon.DataSourceState, err?: any) => void, + ) { + this._statusCallback = statusCallback; // hold reference for usage in stop() + statusCallback(subsystemCommon.DataSourceState.Initializing); + this._poll(dataCallback, statusCallback); + } + + stop() { + if (this._timeoutHandle) { + clearTimeout(this._timeoutHandle); + this._timeoutHandle = undefined; + } + this._statusCallback?.(subsystemCommon.DataSourceState.Closed); + this._stopped = true; + this._statusCallback = undefined; + } +} diff --git a/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts b/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts new file mode 100644 index 0000000000..fea448ed7f --- /dev/null +++ b/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts @@ -0,0 +1,193 @@ +import { internal } from '@launchdarkly/js-sdk-common'; + +import { DataKind } from '../api/interfaces'; +import { + LDDataSourceUpdates, + LDFeatureStoreDataStorage, + LDFeatureStoreItem, + LDKeyedFeatureStoreItem, + LDTransactionalFeatureStore, +} from '../api/subsystems'; +import { Clause } from '../evaluation/data/Clause'; +import { Flag } from '../evaluation/data/Flag'; +import { Prerequisite } from '../evaluation/data/Prerequisite'; +import VersionedDataKinds from '../store/VersionedDataKinds'; +import DependencyTracker from './DependencyTracker'; +import NamespacedDataSet from './NamespacedDataSet'; + +type InitMetadata = internal.InitMetadata; + +/** + * This type allows computing the clause dependencies of either a flag or a segment. + */ +interface TypeWithRuleClauses { + prerequisites?: Prerequisite[]; + rules?: [ + { + // The shape of rules are different between flags and segments, but + // both have clauses of the same shape. + clauses?: Clause[]; + }, + ]; +} + +function computeDependencies(namespace: string, item: LDFeatureStoreItem) { + const ret = new NamespacedDataSet(); + const isFlag = namespace === VersionedDataKinds.Features.namespace; + const isSegment = namespace === VersionedDataKinds.Segments.namespace; + if (isFlag) { + const flag = item as Flag; + flag?.prerequisites?.forEach((prereq) => { + ret.set(namespace, prereq.key, true); + }); + } + + if (isFlag || isSegment) { + const itemWithRuleClauses = item as TypeWithRuleClauses; + + itemWithRuleClauses?.rules?.forEach((rule) => { + rule.clauses?.forEach((clause) => { + if (clause.op === 'segmentMatch') { + clause.values.forEach((value) => { + ret.set(VersionedDataKinds.Segments.namespace, value, true); + }); + } + }); + }); + } + return ret; +} + +/** + * @internal + */ +export default class DataSourceUpdates implements LDDataSourceUpdates { + private readonly _dependencyTracker = new DependencyTracker(); + + constructor( + private readonly _featureStore: LDTransactionalFeatureStore, + private readonly _hasEventListeners: () => boolean, + private readonly _onChange: (key: string) => void, + ) {} + + init( + allData: LDFeatureStoreDataStorage, + callback: () => void, + initMetadata?: InitMetadata, + ): void { + this.applyChanges(true, allData, callback, initMetadata); // basis is true for init + } + + upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void { + this.applyChanges( + false, // basis is false for upserts + { + [kind.namespace]: { + [data.key]: data, + }, + }, + callback, + ); + } + + applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + callback: () => void, + initMetadata?: InitMetadata, + selector?: String, + ): void { + const checkForChanges = this._hasEventListeners(); + const doApplyChanges = (oldData: LDFeatureStoreDataStorage) => { + this._featureStore.applyChanges( + basis, + data, + () => { + // Defer change events so they execute after the callback. + Promise.resolve().then(() => { + if (basis) { + this._dependencyTracker.reset(); + } + + Object.entries(data).forEach(([namespace, items]) => { + Object.keys(items || {}).forEach((key) => { + const item = items[key]; + this._dependencyTracker.updateDependenciesFrom( + namespace, + key, + computeDependencies(namespace, item), + ); + }); + }); + + if (checkForChanges) { + const updatedItems = new NamespacedDataSet(); + Object.keys(data).forEach((namespace) => { + const oldDataForKind = oldData[namespace]; + const newDataForKind = data[namespace]; + let iterateData; + if (basis) { + // for basis, need to iterate on all keys + iterateData = { ...oldDataForKind, ...newDataForKind }; + } else { + // for non basis, only need to iterate on keys in incoming data + iterateData = { ...newDataForKind }; + } + Object.keys(iterateData).forEach((key) => { + this.addIfModified( + namespace, + key, + oldDataForKind && oldDataForKind[key], + newDataForKind && newDataForKind[key], + updatedItems, + ); + }); + }); + + this.sendChangeEvents(updatedItems); + } + }); + callback?.(); + }, + initMetadata, + selector, + ); + }; + + let oldData = {}; + if (checkForChanges) { + // record old data before making changes to use for change calculations + this._featureStore.all(VersionedDataKinds.Features, (oldFlags) => { + this._featureStore.all(VersionedDataKinds.Segments, (oldSegments) => { + oldData = { + [VersionedDataKinds.Features.namespace]: oldFlags, + [VersionedDataKinds.Segments.namespace]: oldSegments, + }; + }); + }); + } + + doApplyChanges(oldData); + } + + addIfModified( + namespace: string, + key: string, + oldValue: LDFeatureStoreItem | null | undefined, + newValue: LDFeatureStoreItem, + toDataSet: NamespacedDataSet, + ) { + if (newValue && oldValue && newValue.version <= oldValue.version) { + return; + } + this._dependencyTracker.updateModifiedItems(toDataSet, namespace, key); + } + + sendChangeEvents(dataSet: NamespacedDataSet) { + dataSet.enumerate((namespace, key) => { + if (namespace === VersionedDataKinds.Features.namespace) { + this._onChange(key); + } + }); + } +} diff --git a/packages/shared/sdk-server/src/store/AsyncTransactionalStoreFacade.ts b/packages/shared/sdk-server/src/store/AsyncTransactionalStoreFacade.ts new file mode 100644 index 0000000000..a93ba887d6 --- /dev/null +++ b/packages/shared/sdk-server/src/store/AsyncTransactionalStoreFacade.ts @@ -0,0 +1,82 @@ +import { internal } from '@launchdarkly/js-sdk-common'; + +import { DataKind } from '../api/interfaces'; +import { + LDFeatureStoreDataStorage, + LDFeatureStoreItem, + LDFeatureStoreKindData, + LDKeyedFeatureStoreItem, + LDTransactionalFeatureStore, +} from '../api/subsystems'; +import promisify from '../async/promisify'; + +type InitMetadata = internal.InitMetadata; + +/** + * Provides an async interface to a feature store. + * + * This allows for using a store using async/await instead of callbacks. + * + */ +export default class AsyncTransactionalStoreFacade { + private _store: LDTransactionalFeatureStore; + + constructor(store: LDTransactionalFeatureStore) { + this._store = store; + } + + async get(kind: DataKind, key: string): Promise { + return promisify((cb) => { + this._store.get(kind, key, cb); + }); + } + + async all(kind: DataKind): Promise { + return promisify((cb) => { + this._store.all(kind, cb); + }); + } + + async init(allData: LDFeatureStoreDataStorage, initMetadata?: InitMetadata): Promise { + return promisify((cb) => { + this._store.init(allData, cb, initMetadata); + }); + } + + async delete(kind: DataKind, key: string, version: number): Promise { + return promisify((cb) => { + this._store.delete(kind, key, version, cb); + }); + } + + async upsert(kind: DataKind, data: LDKeyedFeatureStoreItem): Promise { + return promisify((cb) => { + this._store.upsert(kind, data, cb); + }); + } + + async initialized(): Promise { + return promisify((cb) => { + this._store.initialized(cb); + }); + } + + async applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + initMetadata?: InitMetadata, + selector?: String, + ): Promise { + return promisify((cb) => { + this._store.applyChanges(basis, data, cb, initMetadata, selector); + }); + } + + close(): void { + this._store.close(); + } + + getInitMetadata?(): InitMetadata | undefined { + return this._store.getInitMetaData?.(); + } +} diff --git a/packages/shared/sdk-server/src/store/InMemoryFeatureStore.ts b/packages/shared/sdk-server/src/store/InMemoryFeatureStore.ts index 30d9d6db7c..258990f5ba 100644 --- a/packages/shared/sdk-server/src/store/InMemoryFeatureStore.ts +++ b/packages/shared/sdk-server/src/store/InMemoryFeatureStore.ts @@ -2,38 +2,22 @@ import { internal } from '@launchdarkly/js-sdk-common'; import { DataKind } from '../api/interfaces'; import { - LDFeatureStore, LDFeatureStoreDataStorage, LDFeatureStoreItem, LDFeatureStoreKindData, LDKeyedFeatureStoreItem, + LDTransactionalFeatureStore, } from '../api/subsystems'; type InitMetadata = internal.InitMetadata; -export default class InMemoryFeatureStore implements LDFeatureStore { +export default class InMemoryFeatureStore implements LDTransactionalFeatureStore { private _allData: LDFeatureStoreDataStorage = {}; private _initCalled = false; private _initMetadata?: InitMetadata; - private _addItem(kind: DataKind, key: string, item: LDFeatureStoreItem) { - let items = this._allData[kind.namespace]; - if (!items) { - items = {}; - this._allData[kind.namespace] = items; - } - if (Object.hasOwnProperty.call(items, key)) { - const old = items[key]; - if (!old || old.version < item.version) { - items[key] = item; - } - } else { - items[key] = item; - } - } - get(kind: DataKind, key: string, callback: (res: LDFeatureStoreItem | null) => void): void { const items = this._allData[kind.namespace]; if (items) { @@ -63,20 +47,75 @@ export default class InMemoryFeatureStore implements LDFeatureStore { callback: () => void, initMetadata?: InitMetadata, ): void { - this._initCalled = true; - this._allData = allData as LDFeatureStoreDataStorage; - this._initMetadata = initMetadata; - callback?.(); + this.applyChanges(true, allData, callback, initMetadata); } delete(kind: DataKind, key: string, version: number, callback: () => void): void { - const deletedItem = { version, deleted: true }; - this._addItem(kind, key, deletedItem); - callback?.(); + const item: LDKeyedFeatureStoreItem = { key, version, deleted: true }; + this.applyChanges( + false, + { + [kind.namespace]: { + [key]: item, + }, + }, + callback, + ); } upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void { - this._addItem(kind, data.key, data); + this.applyChanges( + false, + { + [kind.namespace]: { + [data.key]: data, + }, + }, + callback, + ); + } + + applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + callback: () => void, + initMetadata?: InitMetadata, + _selector?: String, // TODO: SDK-1044 - Utilize selector + ): void { + if (basis) { + this._initCalled = true; + this._allData = data; + this._initMetadata = initMetadata; + } else { + const tempData: LDFeatureStoreDataStorage = {}; + // shallow copy to protect against concurrent read + Object.entries(this._allData).forEach(([namespace, items]) => { + tempData[namespace] = { ...items }; + }); + + Object.entries(data).forEach(([namespace, items]) => { + Object.keys(items || {}).forEach((key) => { + let existingItems = tempData[namespace]; + if (!existingItems) { + existingItems = {}; + tempData[namespace] = existingItems; + } + const item = items[key]; + if (Object.hasOwnProperty.call(existingItems, key)) { + const old = existingItems[key]; + // TODO: SDK-1046 - Determine if version check should be removed + if (!old || old.version < item.version) { + existingItems[key] = { key, ...item }; + } + } else { + existingItems[key] = { key, ...item }; + } + }); + }); + + this._allData = tempData; + } + callback?.(); } diff --git a/packages/shared/sdk-server/src/store/TransactionalFeatureStore.ts b/packages/shared/sdk-server/src/store/TransactionalFeatureStore.ts new file mode 100644 index 0000000000..56d02c7474 --- /dev/null +++ b/packages/shared/sdk-server/src/store/TransactionalFeatureStore.ts @@ -0,0 +1,114 @@ +import { internal } from '@launchdarkly/js-sdk-common'; + +import { DataKind } from '../api/interfaces'; +import { + LDFeatureStore, + LDFeatureStoreDataStorage, + LDFeatureStoreItem, + LDFeatureStoreKindData, + LDKeyedFeatureStoreItem, + LDTransactionalFeatureStore, +} from '../api/subsystems'; +import InMemoryFeatureStore from './InMemoryFeatureStore'; + +/** + * This decorator can take a non-transactional {@link LDFeatureStore} implementation + * and adapt it to be transactional through the use of an in-memory store acting as + * cache. + */ +export default class TransactionalFeatureStore implements LDTransactionalFeatureStore { + private _memoryStore: LDTransactionalFeatureStore; + private _activeStore: LDFeatureStore; + + constructor(private readonly _nonTransPersistenceStore: LDFeatureStore) { + // persistence store is inital active store + this._activeStore = this._nonTransPersistenceStore; + this._memoryStore = new InMemoryFeatureStore(); + } + + get(kind: DataKind, key: string, callback: (res: LDFeatureStoreItem | null) => void): void { + this._activeStore.get(kind, key, callback); + } + + all(kind: DataKind, callback: (res: LDFeatureStoreKindData) => void): void { + this._activeStore.all(kind, callback); + } + + init(allData: LDFeatureStoreDataStorage, callback: () => void): void { + // adapt to applyChanges for common handling + this.applyChanges(true, allData, callback); + } + + delete(kind: DataKind, key: string, version: number, callback: () => void): void { + // adapt to applyChanges for common handling + const item: LDKeyedFeatureStoreItem = { key, version, deleted: true }; + this.applyChanges( + false, + { + [kind.namespace]: { + [key]: item, + }, + }, + callback, + ); + } + + upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void { + // adapt to applyChanges for common handling + this.applyChanges( + false, + { + [kind.namespace]: { + [data.key]: data, + }, + }, + callback, + ); + } + + applyChanges( + basis: boolean, + data: LDFeatureStoreDataStorage, + callback: () => void, + _initMetadata?: internal.InitMetadata, + _selector?: String, // TODO: SDK-1044 - Utilize selector + ): void { + this._memoryStore.applyChanges(basis, data, () => { + // TODO: SDK-1047 conditional propgation to persistence based on parameter + if (basis) { + // basis causes memory store to become the active store + this._activeStore = this._memoryStore; + + this._nonTransPersistenceStore.init(data, callback); + } else { + const promises: Promise[] = []; + Object.entries(data).forEach(([namespace, items]) => { + Object.keys(items || {}).forEach((key) => { + const item = items[key]; + promises.push( + new Promise((resolve) => { + this._nonTransPersistenceStore.upsert({ namespace }, { key, ...item }, resolve); + }), + ); + }); + }); + Promise.all(promises).then(callback); + } + }); + } + + initialized(callback: (isInitialized: boolean) => void): void { + // this is valid because the active store will only switch to the in memory store + // after it has already been initialized itself + this._activeStore.initialized(callback); + } + + close(): void { + this._nonTransPersistenceStore.close(); + this._memoryStore.close(); + } + + getDescription(): string { + return 'transactional persistent store'; + } +} diff --git a/packages/shared/sdk-server/src/store/index.ts b/packages/shared/sdk-server/src/store/index.ts index 047f32648f..b320548d70 100644 --- a/packages/shared/sdk-server/src/store/index.ts +++ b/packages/shared/sdk-server/src/store/index.ts @@ -1,5 +1,13 @@ import AsyncStoreFacade from './AsyncStoreFacade'; +import AsyncTransactionalStoreFacade from './AsyncTransactionalStoreFacade'; import PersistentDataStoreWrapper from './PersistentDataStoreWrapper'; import { deserializePoll } from './serialization'; +import TransactionalFeatureStore from './TransactionalFeatureStore'; -export { AsyncStoreFacade, PersistentDataStoreWrapper, deserializePoll }; +export { + AsyncStoreFacade, + AsyncTransactionalStoreFacade, + PersistentDataStoreWrapper, + TransactionalFeatureStore, + deserializePoll, +}; From 0d24f3b517954d0e829c68223fa4529e721df028 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Wed, 7 May 2025 11:05:07 -0500 Subject: [PATCH 2/4] fixing prettier issue --- packages/shared/sdk-server/src/api/subsystems/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/shared/sdk-server/src/api/subsystems/index.ts b/packages/shared/sdk-server/src/api/subsystems/index.ts index 1c99b80b67..56390adf33 100644 --- a/packages/shared/sdk-server/src/api/subsystems/index.ts +++ b/packages/shared/sdk-server/src/api/subsystems/index.ts @@ -3,4 +3,3 @@ export * from './LDFeatureRequestor'; export * from './LDFeatureStore'; export * from './LDTransactionalDataSourceUpdates'; export * from './LDTransactionalFeatureStore'; - From d2072c2b16706bd3016efc01713c0f0b325b774e Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 8 May 2025 14:44:40 -0500 Subject: [PATCH 3/4] updating interface definitions to use extends --- .../LDTransactionalDataSourceUpdates.ts | 47 +------ .../subsystems/LDTransactionalFeatureStore.ts | 124 +----------------- .../TransactionalDataSourceUpdates.ts | 4 +- 3 files changed, 8 insertions(+), 167 deletions(-) diff --git a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts index a60d878610..ef6d04099b 100644 --- a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts +++ b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalDataSourceUpdates.ts @@ -1,53 +1,14 @@ import { internal } from '@launchdarkly/js-sdk-common'; -import { DataKind } from '../interfaces'; -import { LDFeatureStoreDataStorage, LDKeyedFeatureStoreItem } from './LDFeatureStore'; +import { LDDataSourceUpdates } from './LDDataSourceUpdates'; +import { LDFeatureStoreDataStorage } from './LDFeatureStore'; type InitMetadata = internal.InitMetadata; /** - * Interface that a data source implementation will use to push data into the SDK. - * - * The data source interacts with this object, rather than manipulating the data store directly, so - * that the SDK can perform any other necessary operations that must happen when data is updated. + * Transactional version of {@link LDDataSourceUpdates} with support for {@link applyChanges} */ -export interface LDTransactionalDataSourceUpdates { - /** - * Completely overwrites the current contents of the data store with a set of items for each - * collection. - * - * @param allData - * An object in which each key is the "namespace" of a collection (e.g. `"features"`) and - * the value is an object that maps keys to entities. The actual type of this parameter is - * `interfaces.FullDataSet`. - * - * @param callback - * Will be called when the store has been initialized. - * - * @param initMetadata - * Optional metadata to initialize the data source with. - */ - init(allData: LDFeatureStoreDataStorage, callback: () => void, initMetadata?: InitMetadata): void; - - /** - * Updates or inserts an item in the specified collection. For updates, the object will only be - * updated if the existing version is less than the new version. - * - * @param kind - * The type of data to be accessed. The actual type of this parameter is - * {@link interfaces.DataKind}. - * - * @param data - * The contents of the entity, as an object that can be converted to JSON. The store - * should check the `version` property of this object, and should *not* overwrite any - * existing data if the existing `version` is greater than or equal to that value. - * The actual type of this parameter is {@link interfaces.VersionedData}. - * - * @param callback - * Will be called after the upsert operation is complete. - */ - upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void; - +export interface LDTransactionalDataSourceUpdates extends LDDataSourceUpdates { /** * @param basis If true, completely overwrites the current contents of the data store * with the provided data. If false, upserts the items in the provided data. Upserts diff --git a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts index fd3542c8d8..48b4e6c959 100644 --- a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts +++ b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts @@ -2,6 +2,7 @@ import { internal } from '@launchdarkly/js-sdk-common'; import { DataKind } from '../interfaces'; import { + LDFeatureStore, LDFeatureStoreDataStorage, LDFeatureStoreItem, LDFeatureStoreKindData, @@ -13,101 +14,7 @@ type InitMetadata = internal.InitMetadata; /** * Transactional version of {@link LDFeatureStore} with support for {@link applyChanges} */ -export interface LDTransactionalFeatureStore { - /** - * Get an entity from the store. - * - * The store should treat any entity with the property `deleted: true` as "not found". - * - * @param kind - * The type of data to be accessed. The store should not make any assumptions about the format - * of the data, but just return a JSON object. The actual type of this parameter is - * {@link interfaces.DataKind}. - * - * @param key - * The unique key of the entity within the specified collection. - * - * @param callback - * Will be called with the retrieved entity, or null if not found. The actual type of the result - * value is {@link interfaces.VersionedData}. - */ - get(kind: DataKind, key: string, callback: (res: LDFeatureStoreItem | null) => void): void; - - /** - * Get all entities from a collection. - * - * The store should filter out any entities with the property `deleted: true`. - * - * @param kind - * The type of data to be accessed. The store should not make any assumptions about the format - * of the data, but just return an object in which each key is the `key` property of an entity - * and the value is the entity. The actual type of this parameter is - * {@link interfaces.DataKind}. - * - * @param callback - * Will be called with the resulting map. The actual type of the result value is - * `interfaces.KeyedItems`. - */ - all(kind: DataKind, callback: (res: LDFeatureStoreKindData) => void): void; - - /** - * Initialize the store, overwriting any existing data. - * - * @param allData - * An object in which each key is the "namespace" of a collection (e.g. `"features"`) and - * the value is an object that maps keys to entities. The actual type of this parameter is - * `interfaces.FullDataSet`. - * - * @param callback - * Will be called when the store has been initialized. - * - * @param initMetadata - * Optional metadata to initialize the feature store with. - */ - init(allData: LDFeatureStoreDataStorage, callback: () => void, initMetadata?: InitMetadata): void; - - /** - * Delete an entity from the store. - * - * Deletion should be implemented by storing a placeholder object with the property - * `deleted: true` and a `version` property equal to the provided version. In other words, - * it should be exactly the same as calling `upsert` with such an object. - * - * @param kind - * The type of data to be accessed. The actual type of this parameter is - * {@link interfaces.DataKind}. - * - * @param key - * The unique key of the entity within the specified collection. - * - * @param version - * A number that must be greater than the `version` property of the existing entity in - * order for it to be deleted. If it is less than or equal to the existing version, the - * method should do nothing. - * - * @param callback - * Will be called when the delete operation is complete. - */ - delete(kind: DataKind, key: string, version: number, callback: () => void): void; - - /** - * Add an entity or update an existing entity. - * - * @param kind - * The type of data to be accessed. The actual type of this parameter is - * {@link interfaces.DataKind}. - * - * @param data - * The contents of the entity, as an object that can be converted to JSON. The store - * should check the `version` property of this object, and should *not* overwrite any - * existing data if the existing `version` is greater than or equal to that value. - * The actual type of this parameter is {@link interfaces.VersionedData}. - * - * @param callback - * Will be called after the upsert operation is complete. - */ - upsert(kind: DataKind, data: LDKeyedFeatureStoreItem, callback: () => void): void; - +export interface LDTransactionalFeatureStore extends LDFeatureStore { /** * Applies the provided data onto the existing data, replacing all data or upserting depending * on the basis parameter. Must call {@link applyChanges} providing basis before calling {@link applyChanges} @@ -130,31 +37,4 @@ export interface LDTransactionalFeatureStore { initMetadata?: internal.InitMetadata, selector?: String, ): void; - - /** - * Tests whether the store is initialized. - * - * "Initialized" means that the store has been populated with data, either by the client - * having called `init()` within this process, or by another process (if this is a shared - * database). - * - * @param callback - * Will be called back with the boolean result. - */ - initialized(callback: (isInitialized: boolean) => void): void; - - /** - * Releases any resources being used by the feature store. - */ - close(): void; - - /** - * Get a description of the store. - */ - getDescription?(): string; - - /** - * Get the initialization metadata of the store. - */ - getInitMetaData?(): InitMetadata | undefined; } diff --git a/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts b/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts index fea448ed7f..fc0f89b422 100644 --- a/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts +++ b/packages/shared/sdk-server/src/data_sources/TransactionalDataSourceUpdates.ts @@ -2,10 +2,10 @@ import { internal } from '@launchdarkly/js-sdk-common'; import { DataKind } from '../api/interfaces'; import { - LDDataSourceUpdates, LDFeatureStoreDataStorage, LDFeatureStoreItem, LDKeyedFeatureStoreItem, + LDTransactionalDataSourceUpdates, LDTransactionalFeatureStore, } from '../api/subsystems'; import { Clause } from '../evaluation/data/Clause'; @@ -61,7 +61,7 @@ function computeDependencies(namespace: string, item: LDFeatureStoreItem) { /** * @internal */ -export default class DataSourceUpdates implements LDDataSourceUpdates { +export default class TransactionalDataSourceUpdates implements LDTransactionalDataSourceUpdates { private readonly _dependencyTracker = new DependencyTracker(); constructor( From 80e6c3efce11543114e30b6c8b6c903c7ff7adac Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 8 May 2025 15:11:13 -0500 Subject: [PATCH 4/4] fixing imports --- .../src/api/subsystems/LDTransactionalFeatureStore.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts index 48b4e6c959..d3c126b8b3 100644 --- a/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts +++ b/packages/shared/sdk-server/src/api/subsystems/LDTransactionalFeatureStore.ts @@ -1,13 +1,6 @@ import { internal } from '@launchdarkly/js-sdk-common'; -import { DataKind } from '../interfaces'; -import { - LDFeatureStore, - LDFeatureStoreDataStorage, - LDFeatureStoreItem, - LDFeatureStoreKindData, - LDKeyedFeatureStoreItem, -} from './LDFeatureStore'; +import { LDFeatureStore, LDFeatureStoreDataStorage } from './LDFeatureStore'; type InitMetadata = internal.InitMetadata;