Skip to content

Refactor: add init method and onReadyFromCacheCb param to storage factory [WIP] #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: development
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2.4.1 (XXX XX, 2025)
- Updated internal storage factory to emit the SDK_READY_FROM_CACHE event when it corresponds, to clean up the initialization flow.

2.4.0 (May 27, 2025)
- Added support for rule-based segments. These segments determine membership at runtime by evaluating their configured rules against the user attributes provided to the SDK.
- Added support for feature flag prerequisites. This allows customers to define dependency conditions between flags, which are evaluated before any allowlists or targeting rules.
Expand Down
1 change: 1 addition & 0 deletions src/sdkFactory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function sdkFactory(params: ISdkFactoryParams): SplitIO.ISDK | SplitIO.IA
// We will just log and allow for the SDK to end up throwing an SDK_TIMEOUT event for devs to handle.
validateAndTrackApiKey(log, settings.core.authorizationKey);
readiness.init();
storage.init && storage.init();
uniqueKeysTracker.start();
syncManager && syncManager.start();
signalListener && signalListener.start();
Expand Down
24 changes: 21 additions & 3 deletions src/storages/inLocalStorage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { RBSegmentsCacheInLocal } from './RBSegmentsCacheInLocal';
import { MySegmentsCacheInLocal } from './MySegmentsCacheInLocal';
import { InMemoryStorageCSFactory } from '../inMemory/InMemoryStorageCS';
import { LOG_PREFIX } from './constants';
import { STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { LOCALHOST_MODE, STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from '../inMemory/TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from '../inMemory/UniqueKeysCacheInMemoryCS';
import { getMatching } from '../../utils/key';
Expand All @@ -32,7 +32,7 @@ export function InLocalStorage(options: SplitIO.InLocalStorageOptions = {}): ISt
return InMemoryStorageCSFactory(params);
}

const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const { onReadyFromCacheCb, settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const matchingKey = getMatching(settings.core.key);
const keys = new KeyBuilderCS(prefix, matchingKey);

Expand All @@ -41,7 +41,7 @@ export function InLocalStorage(options: SplitIO.InLocalStorageOptions = {}): ISt
const segments = new MySegmentsCacheInLocal(log, keys);
const largeSegments = new MySegmentsCacheInLocal(log, myLargeSegmentsKeyBuilder(prefix, matchingKey));

return {
const storage = {
splits,
rbSegments,
segments,
Expand All @@ -52,6 +52,12 @@ export function InLocalStorage(options: SplitIO.InLocalStorageOptions = {}): ISt
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: new UniqueKeysCacheInMemoryCS(),

init() {
if (settings.mode === LOCALHOST_MODE || splits.getChangeNumber() > -1) {
Promise.resolve().then(onReadyFromCacheCb);
}
},

validateCache() {
return validateCache(options, settings, keys, splits, rbSegments, segments, largeSegments);
},
Expand All @@ -76,6 +82,18 @@ export function InLocalStorage(options: SplitIO.InLocalStorageOptions = {}): ISt
};
},
};

// @TODO revisit storage logic in localhost mode
// No tracking data in localhost mode to avoid memory leaks
if (params.settings.mode === LOCALHOST_MODE) {
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
}

return storage;
}

InLocalStorageCSFactory.type = STORAGE_LOCALSTORAGE;
Expand Down
2 changes: 1 addition & 1 deletion src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const DEFAULT_OPTIONS = {
const DEFAULT_LIBRARY_OPTIONS = {
enableOfflineQueue: false,
connectTimeout: DEFAULT_OPTIONS.connectionTimeout,
lazyConnect: false
lazyConnect: false // @TODO true to avoid side-effects on instantiation
};

interface IRedisCommand {
Expand Down
4 changes: 4 additions & 0 deletions src/storages/pluggable/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory(internalSdkParams);
storage.init();

assertStorageInterface(storage); // the instance must implement the storage interface
expect(wrapperMock.connect).toBeCalledTimes(1); // wrapper connect method should be called once when storage is created
Expand Down Expand Up @@ -74,6 +75,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance for partial consumer mode (events and impressions cache in memory)', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory({ ...internalSdkParams, settings: { ...internalSdkParams.settings, mode: CONSUMER_PARTIAL_MODE } });
storage.init();

assertStorageInterface(storage);
expect(wrapperMock.connect).toBeCalledTimes(1);
Expand Down Expand Up @@ -102,6 +104,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. Wrapper is pollute but doesn't have filter query key, so it should clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that expected caches are present
Expand All @@ -121,6 +124,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. This time the wrapper has the current filter query key, so it should not clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that cache was not cleared
Expand Down
73 changes: 39 additions & 34 deletions src/storages/pluggable/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IPluggableStorageWrapper, IStorageAsync, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';
import { IPluggableStorageWrapper, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';

import { KeyBuilderSS } from '../KeyBuilderSS';
import { SplitsCachePluggable } from './SplitsCachePluggable';
Expand Down Expand Up @@ -63,11 +63,12 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn

const prefix = validatePrefix(options.prefix);

function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync {
function PluggableStorageFactory(params: IStorageFactoryParams) {
const { onReadyCb, settings, settings: { log, mode, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const wrapper = wrapperAdapter(log, options.wrapper);
let connectPromise: Promise<void>;

const isSynchronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE;
Expand All @@ -86,36 +87,6 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() :
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper);

// Connects to wrapper and emits SDK_READY event on main client
const connectPromise = wrapper.connect().then(() => {
if (isSynchronizer) {
// @TODO reuse InLocalStorage::validateCache logic
// In standalone or producer mode, clear storage if SDK key, flags filter criteria or flags spec version was modified
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if ((impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if ((uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});

return {
splits: new SplitsCachePluggable(log, keys, wrapper, settings.sync.__splitFiltersValidation),
rbSegments: new RBSegmentsCachePluggable(log, keys, wrapper),
Expand All @@ -126,6 +97,40 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
telemetry,
uniqueKeys: uniqueKeysCache,

init() {
if (connectPromise) return connectPromise;

// Connects to wrapper and emits SDK_READY event on main client
return connectPromise = wrapper.connect().then(() => {
if (isSynchronizer) {
// @TODO reuse InLocalStorage::validateCache logic
// In standalone or producer mode, clear storage if SDK key, flags filter criteria or flags spec version was modified
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if ((impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if ((uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});
},

// Stop periodic flush and disconnect the underlying storage
destroy() {
return Promise.all(isSynchronizer ? [] : [
Expand All @@ -135,8 +140,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
},

// emits SDK_READY event on shared clients and returns a reference to the storage
shared(_, onReadyCb) {
connectPromise.then(onReadyCb);
shared(_: string, onReadyCb: (error?: any) => void) {
this.init().then(onReadyCb);

return {
...this,
Expand Down
3 changes: 2 additions & 1 deletion src/storages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ export interface IStorageBase<
events: TEventsCache,
telemetry?: TTelemetryCache,
uniqueKeys: TUniqueKeysCache,
init?: () => void | Promise<void>,
destroy(): void | Promise<void>,
shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this
}
Expand Down Expand Up @@ -505,7 +506,7 @@ export interface IStorageFactoryParams {
* It is meant for emitting SDK_READY event in consumer mode, and waiting before using the storage in the synchronizer.
*/
onReadyCb: (error?: any) => void,
onReadyFromCacheCb: () => void,
onReadyFromCacheCb: (error?: any) => void,
}


Expand Down
11 changes: 3 additions & 8 deletions src/sync/offline/syncTasks/fromObjectSyncTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { syncTaskFactory } from '../../syncTask';
import { ISyncTask } from '../../types';
import { ISettings } from '../../../types';
import { CONTROL } from '../../../utils/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
import { SYNC_OFFLINE_DATA, ERROR_SYNC_OFFLINE_LOADING } from '../../../logger/constants';

/**
Expand Down Expand Up @@ -59,13 +59,8 @@ export function fromObjectUpdaterFactory(

if (startingUp) {
startingUp = false;
const isCacheLoaded = storage.validateCache ? storage.validateCache() : false;
Promise.resolve().then(() => {
// Emits SDK_READY_FROM_CACHE
if (isCacheLoaded) readiness.splits.emit(SDK_SPLITS_CACHE_LOADED);
// Emits SDK_READY
readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
});
// Emits SDK_READY
readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { validateStorageCS, __InLocalStorageMockFactory } from '../storageCS';
import { validateStorageCS } from '../storageCS';
import { InMemoryStorageCSFactory } from '../../../../storages/inMemory/InMemoryStorageCS';
import { loggerMock as log } from '../../../../logger/__tests__/sdkLogger.mock';

Expand Down Expand Up @@ -32,11 +32,6 @@ describe('storage validator for pluggable storage (client-side)', () => {
expect(log.error).not.toBeCalled();
});

test('fallbacks to mock InLocalStorage storage if the storage is InLocalStorage and the mode localhost', () => {
expect(validateStorageCS({ log, mode: 'localhost', storage: mockInLocalStorageFactory })).toBe(__InLocalStorageMockFactory);
expect(log.error).not.toBeCalled();
});

test('throws error if the provided storage factory is not compatible with the mode', () => {
expect(() => { validateStorageCS({ log, mode: 'consumer', storage: mockInLocalStorageFactory }); }).toThrow('A PluggableStorage instance is required on consumer mode');
expect(() => { validateStorageCS({ log, mode: 'consumer_partial', storage: mockInLocalStorageFactory }); }).toThrow('A PluggableStorage instance is required on consumer mode');
Expand Down
13 changes: 0 additions & 13 deletions src/utils/settingsValidation/storage/storageCS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ import SplitIO from '../../../../types/splitio';
import { ILogger } from '../../../logger/types';
import { ERROR_STORAGE_INVALID } from '../../../logger/constants';
import { LOCALHOST_MODE, STANDALONE_MODE, STORAGE_PLUGGABLE, STORAGE_LOCALSTORAGE, STORAGE_MEMORY } from '../../../utils/constants';
import { IStorageFactoryParams, IStorageSync } from '../../../storages/types';

export function __InLocalStorageMockFactory(params: IStorageFactoryParams): IStorageSync {
const result = InMemoryStorageCSFactory(params);
result.validateCache = () => true; // to emit SDK_READY_FROM_CACHE
return result;
}
__InLocalStorageMockFactory.type = STORAGE_MEMORY;

/**
* This function validates `settings.storage` object
Expand All @@ -31,11 +23,6 @@ export function validateStorageCS(settings: { log: ILogger, storage?: any, mode:
log.error(ERROR_STORAGE_INVALID);
}

// In localhost mode with InLocalStorage, fallback to a mock InLocalStorage to emit SDK_READY_FROM_CACHE
if (mode === LOCALHOST_MODE && storage.type === STORAGE_LOCALSTORAGE) {
return __InLocalStorageMockFactory;
}

if ([LOCALHOST_MODE, STANDALONE_MODE].indexOf(mode) === -1) {
// Consumer modes require an async storage
if (storage.type !== STORAGE_PLUGGABLE) throw new Error('A PluggableStorage instance is required on consumer mode');
Expand Down