Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add cohort sync #49

Merged
merged 48 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8a1140c
move poller outside
zhukaihan Jun 11, 2024
5b3926d
added cohort fetches
zhukaihan Jun 12, 2024
9f2fc2e
remove unused imports
zhukaihan Jun 12, 2024
4eba139
always update cohort and fix test
zhukaihan Jun 12, 2024
88b1453
added cohort server url
zhukaihan Jun 13, 2024
69b74af
remove streamTest file
zhukaihan Jun 13, 2024
42bdc9d
added fetch key to base64 and memberId convert to set
zhukaihan Jun 13, 2024
cb508c1
fixed type
zhukaihan Jun 13, 2024
4dda93d
added cohort to eval context
zhukaihan Jun 13, 2024
025e980
fixed bugs, moved configs, surface cohort errors to flag pollers
zhukaihan Jun 14, 2024
33e709c
storage update only after all cohort loads
zhukaihan Jun 18, 2024
a4b4102
added tests
zhukaihan Jun 20, 2024
dd36cee
added tests
zhukaihan Jun 25, 2024
9b0da10
add ci test with secrets from env
zhukaihan Jun 25, 2024
dfe0517
fix cohortPoller.test.ts
zhukaihan Jun 25, 2024
8a06cd6
not use environment
zhukaihan Jun 25, 2024
992543e
added .env instr, added tests
zhukaihan Jun 25, 2024
113b747
cleanup unnecessary check
zhukaihan Jun 25, 2024
e6a263e
lint
zhukaihan Jun 25, 2024
62b34a5
fix test node version matrix
zhukaihan Jun 25, 2024
7553516
polish test and add macos-13
zhukaihan Jun 27, 2024
f7ddd23
updated gh action node versions to current lts's
zhukaihan Jun 27, 2024
b36b302
remove unsupported node v24
zhukaihan Jun 27, 2024
50dd195
added serverZone config option
zhukaihan Jun 27, 2024
cc611c3
parameterize test
zhukaihan Jun 27, 2024
c5cce04
moved config util code under util
zhukaihan Jun 27, 2024
f23680c
added eu test
zhukaihan Jun 27, 2024
8072381
increase cohort fetch timeout
zhukaihan Jun 28, 2024
b5f441f
update to flag poller loads new cohort, cohort updater polls updates
zhukaihan Jul 3, 2024
85bcf02
fix client start and stop, cleanup
zhukaihan Jul 9, 2024
14e2004
fixed tests
zhukaihan Jul 13, 2024
8ea58b9
fixed typo, env, and err msg
zhukaihan Jul 13, 2024
7cd8adb
fix gh action
zhukaihan Jul 13, 2024
ffc30a6
added streamer test, added streamer onInitUpdate, clearer logic
zhukaihan Jul 13, 2024
b60bb56
add test, add return types, move a util func
zhukaihan Jul 25, 2024
1690d80
fix null cohortUpdater when no cohort configs
zhukaihan Jul 25, 2024
d5e4cd7
fix poller interval and comments
zhukaihan Jul 30, 2024
1010ec4
fix relative imports
zhukaihan Jul 30, 2024
13913f1
add cohortRequestDelayMillis, use sleep util, skip retry if maxCohort…
zhukaihan Jul 30, 2024
fae8ece
unused imports
zhukaihan Jul 30, 2024
a1267d9
fix relative imports attempt 2
zhukaihan Jul 30, 2024
d11f76e
Log error on eval, dont init fail on if cohort fail, add tests
zhukaihan Aug 1, 2024
0e25667
fix lint
zhukaihan Aug 1, 2024
39841e9
add no config integration test
zhukaihan Aug 1, 2024
7b6750e
change default maxCohortSize
zhukaihan Aug 6, 2024
cb05834
changed configs
zhukaihan Aug 6, 2024
d8740c7
fix lint
zhukaihan Aug 7, 2024
e5c6e53
add test, fix comment
zhukaihan Aug 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
fail-fast: false
matrix:
node-version: ['14', '16', '18']
node-version: ['16', '18', '20', '22']
os: [macos-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}

Expand All @@ -38,3 +38,8 @@ jobs:

- name: Test
run: yarn test --testPathIgnorePatterns "benchmark.test.ts"
env:
API_KEY: ${{ secrets.API_KEY }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
EU_API_KEY: ${{ secrets.EU_API_KEY }}
EU_SECRET_KEY: ${{ secrets.EU_SECRET_KEY }}
1 change: 1 addition & 0 deletions packages/node/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Ignore generated files
gen
.env*
7 changes: 7 additions & 0 deletions packages/node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
To setup for running test on local, create a `.env` file with following
contents, and replace `{API_KEY}` and `{SECRET_KEY}` for the project in test:

```
API_KEY={API_KEY}
SECRET_KEY={SECRET_KEY}
```
152 changes: 138 additions & 14 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import { InMemoryAssignmentFilter } from '../assignment/assignment-filter';
import { AmplitudeAssignmentService } from '../assignment/assignment-service';
import { FetchHttpClient } from '../transport/http';
import { StreamEventSourceFactory } from '../transport/stream';
import { USER_GROUP_TYPE } from '../types/cohort';
import {
AssignmentConfig,
AssignmentConfigDefaults,
LocalEvaluationConfig,
LocalEvaluationDefaults,
} from '../types/config';
import { FlagConfigCache } from '../types/flag';
import { HttpClient } from '../types/transport';
import { ExperimentUser } from '../types/user';
import { Variant, Variants } from '../types/variant';
import { CohortUtils } from '../util/cohort';
import { populateLocalConfigDefaults } from '../util/config';
import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';
import { convertUserToEvaluationContext } from '../util/user';
Expand All @@ -30,6 +32,10 @@ import {
} from '../util/variant';

import { InMemoryFlagConfigCache } from './cache';
import { CohortFetcher } from './cohort/fetcher';
import { CohortPoller } from './cohort/poller';
import { InMemoryCohortStorage } from './cohort/storage';
import { CohortUpdater } from './cohort/updater';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
Expand All @@ -40,33 +46,37 @@ const STREAM_RETRY_JITTER_MAX_MILLIS = 2000; // The jitter to add to delay after
const STREAM_ATTEMPTS = 1; // Number of attempts before fallback to poller.
const STREAM_TRY_DELAY_MILLIS = 1000; // The delay between attempts.

const COHORT_POLLING_INTERVAL_MILLIS_MIN = 60000;

/**
* Experiment client for evaluating variants for a user locally.
* @category Core Usage
*/
export class LocalEvaluationClient {
private readonly logger: Logger;
private readonly config: LocalEvaluationConfig;
protected readonly config: LocalEvaluationConfig;
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;
private readonly cohortUpdater?: CohortUpdater;

/**
* Directly access the client's flag config cache.
*
* Used for directly manipulating the flag configs used for evaluation.
*/
public readonly cache: InMemoryFlagConfigCache;
public readonly cohortStorage: InMemoryCohortStorage;

constructor(
apiKey: string,
config: LocalEvaluationConfig,
config?: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceFactory: StreamEventSourceFactory = (url, params) =>
new EventSource(url, params),
) {
this.config = { ...LocalEvaluationDefaults, ...config };
this.config = populateLocalConfigDefaults(config);
const fetcher = new FlagConfigFetcher(
apiKey,
httpClient,
Expand All @@ -78,27 +88,57 @@ export class LocalEvaluationClient {
this.config.bootstrap,
);
this.logger = new ConsoleLogger(this.config.debug);

this.cohortStorage = new InMemoryCohortStorage();
let cohortFetcher: CohortFetcher = undefined;
if (this.config.cohortSyncConfig) {
cohortFetcher = new CohortFetcher(
this.config.cohortSyncConfig.apiKey,
this.config.cohortSyncConfig.secretKey,
httpClient,
this.config.cohortSyncConfig?.cohortServerUrl,
this.config.cohortSyncConfig?.maxCohortSize,
undefined,
this.config.debug,
);
this.cohortUpdater = new CohortPoller(
cohortFetcher,
this.cohortStorage,
this.cache,
Math.max(
COHORT_POLLING_INTERVAL_MILLIS_MIN,
this.config.cohortSyncConfig?.cohortPollingIntervalMillis,
),
this.config.debug,
);
}

const flagsPoller = new FlagConfigPoller(
fetcher,
this.cache,
this.cohortStorage,
cohortFetcher,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
this.updater = this.config.streamUpdates
? new FlagConfigStreamer(
apiKey,
fetcher,
flagsPoller,
this.cache,
streamEventSourceFactory,
this.config.flagConfigPollingIntervalMillis,
this.config.streamFlagConnTimeoutMillis,
STREAM_ATTEMPTS,
STREAM_TRY_DELAY_MILLIS,
STREAM_RETRY_DELAY_MILLIS +
Math.floor(Math.random() * STREAM_RETRY_JITTER_MAX_MILLIS),
this.config.streamServerUrl,
this.cohortStorage,
cohortFetcher,
this.config.debug,
)
: new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
: flagsPoller;

if (this.config.assignmentConfig) {
this.config.assignmentConfig = {
...AssignmentConfigDefaults,
Expand Down Expand Up @@ -144,6 +184,7 @@ export class LocalEvaluationClient {
flagKeys?: string[],
): Record<string, Variant> {
const flags = this.cache.getAllCached() as Record<string, EvaluationFlag>;
this.enrichUserWithCohorts(user, flags);
this.logger.debug('[Experiment] evaluate - user:', user, 'flags:', flags);
const context = convertUserToEvaluationContext(user);
const sortedFlags = topologicalSort(flags, flagKeys);
Expand All @@ -153,6 +194,87 @@ export class LocalEvaluationClient {
return evaluationVariantsToVariants(results);
}

protected checkFlagsCohortsAvailable(
cohortIdsByFlag: Record<string, Set<string>>,
): boolean {
const availableCohortIds = this.cohortStorage.getAllCohortIds();
for (const key in cohortIdsByFlag) {
const flagCohortIds = cohortIdsByFlag[key];
const unavailableCohortIds = CohortUtils.setSubtract(
flagCohortIds,
availableCohortIds,
);
if (unavailableCohortIds.size > 0) {
this.logger.error(
`[Experiment] Flag ${key} has cohort ids ${[
...unavailableCohortIds,
]} unavailable, evaluation may be incorrect`,
);
return false;
}
}
return true;
}

protected enrichUserWithCohorts(
user: ExperimentUser,
flags: Record<string, EvaluationFlag>,
): void {
const cohortIdsByFlag: Record<string, Set<string>> = {};
const cohortIdsByGroup = {};
for (const key in flags) {
const cohortIdsByGroupOfFlag =
CohortUtils.extractCohortIdsByGroupFromFlag(flags[key]);

CohortUtils.mergeValuesOfBIntoValuesOfA(
cohortIdsByGroup,
cohortIdsByGroupOfFlag,
);

cohortIdsByFlag[key] = CohortUtils.mergeAllValues(cohortIdsByGroupOfFlag);
}

this.checkFlagsCohortsAvailable(cohortIdsByFlag);

// Enrich cohorts with user group type.
const userCohortIds = cohortIdsByGroup[USER_GROUP_TYPE];
if (user.user_id && userCohortIds && userCohortIds.size != 0) {
user.cohort_ids = Array.from(
this.cohortStorage.getCohortsForUser(user.user_id, userCohortIds),
);
}

// Enrich other group types for this user.
if (user.groups) {
for (const groupType in user.groups) {
const groupNames = user.groups[groupType];
if (groupNames.length == 0) {
continue;
}
const groupName = groupNames[0];

const cohortIds = cohortIdsByGroup[groupType];
if (!cohortIds || cohortIds.size == 0) {
continue;
}

if (!user.group_cohort_ids) {
user.group_cohort_ids = {};
}
if (!(groupType in user.group_cohort_ids)) {
user.group_cohort_ids[groupType] = {};
}
user.group_cohort_ids[groupType][groupName] = Array.from(
this.cohortStorage.getCohortsForGroup(
groupType,
groupName,
cohortIds,
),
);
}
}
}

/**
* Locally evaluates flag variants for a user.
*
Expand Down Expand Up @@ -184,7 +306,8 @@ export class LocalEvaluationClient {
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.updater.start();
await this.updater.start();
await this.cohortUpdater?.start();
}

/**
Expand All @@ -193,6 +316,7 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.updater.stop();
this.updater.stop();
this.cohortUpdater?.stop();
}
}
97 changes: 97 additions & 0 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { HttpClient } from '@amplitude/experiment-core';

import { Cohort } from '../../types/cohort';

export type GetCohortOptions = {
libraryName: string;
libraryVersion: string;
cohortId: string;
maxCohortSize: number;
lastModified?: number;
timeoutMillis?: number;
};

export interface CohortApi {
/**
* Calls /sdk/v1/cohort/<cohortId> with query params maxCohortSize and lastModified if specified.
* Returns a promise that
* resolves to a
* Cohort if the cohort downloads successfully or
* undefined if cohort has no change since lastModified timestamp and
* throws an error if download failed.
* @param options
*/
getCohort(options?: GetCohortOptions): Promise<Cohort>;
}

export class CohortClientRequestError extends Error {} // 4xx errors except 429
export class CohortMaxSizeExceededError extends CohortClientRequestError {} // 413 error
export class CohortDownloadError extends Error {} // All other errors

export class SdkCohortApi implements CohortApi {
private readonly cohortApiKey;
private readonly serverUrl;
private readonly httpClient;

constructor(cohortApiKey: string, serverUrl: string, httpClient: HttpClient) {
this.cohortApiKey = cohortApiKey;
this.serverUrl = serverUrl;
this.httpClient = httpClient;
}

public async getCohort(
options?: GetCohortOptions,
): Promise<Cohort | undefined> {
const headers: Record<string, string> = {
Authorization: `Basic ${this.cohortApiKey}`,
};
if (options?.libraryName && options?.libraryVersion) {
headers[
'X-Amp-Exp-Library'
] = `${options.libraryName}/${options.libraryVersion}`;
}

const reqUrl = `${this.serverUrl}/sdk/v1/cohort/${
options.cohortId
}?maxCohortSize=${options.maxCohortSize}${
options.lastModified ? `&lastModified=${options.lastModified}` : ''
}`;
const response = await this.httpClient.request({
requestUrl: reqUrl,
method: 'GET',
headers: headers,
timeoutMillis: options?.timeoutMillis,
});

// Check status code.
// 200: download success.
// 204: no change.
// 413: cohort larger than maxCohortSize
if (response.status == 200) {
const cohort: Cohort = JSON.parse(response.body) as Cohort;
if (Array.isArray(cohort.memberIds)) {
cohort.memberIds = new Set<string>(cohort.memberIds);
}
return cohort;
} else if (response.status == 204) {
return undefined;
} else if (response.status == 413) {
throw new CohortMaxSizeExceededError(
`Cohort size > ${options.maxCohortSize}`,
);
} else if (
400 <= response.status &&
response.status < 500 &&
response.status != 429
) {
// Any 4xx other than 429.
throw new CohortClientRequestError(
`Cohort client error response status ${response.status}, body ${response.body}`,
);
} else {
throw new CohortDownloadError(
`Cohort error response status ${response.status}, body ${response.body}`,
);
}
}
}
Loading
Loading