Skip to content

Commit

Permalink
changed configs
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Aug 6, 2024
1 parent 7b6750e commit cb05834
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 42 deletions.
19 changes: 12 additions & 7 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const STREAM_RETRY_JITTER_MAX_MILLIS = 2000; // The jitter to add to delay after
const STREAM_ATTEMPTS = 1; // Number of attempts before fallback to poller.
const STREAM_TRY_DELAY_MILLIS = 1000; // The delay between attempts.

const COHORT_POLLING_INTERVAL_MILLIS_MIN = 60000;

/**
* Experiment client for evaluating variants for a user locally.
* @category Core Usage
Expand Down Expand Up @@ -89,21 +91,24 @@ export class LocalEvaluationClient {

this.cohortStorage = new InMemoryCohortStorage();
let cohortFetcher: CohortFetcher = undefined;
if (this.config.cohortConfig) {
if (this.config.cohortSyncConfig) {
cohortFetcher = new CohortFetcher(
this.config.cohortConfig.apiKey,
this.config.cohortConfig.secretKey,
this.config.cohortSyncConfig.apiKey,
this.config.cohortSyncConfig.secretKey,
httpClient,
this.config.cohortConfig?.cohortServerUrl,
this.config.cohortConfig?.maxCohortSize,
this.config.cohortConfig?.cohortRequestDelayMillis,
this.config.cohortSyncConfig?.cohortServerUrl,
this.config.cohortSyncConfig?.maxCohortSize,
undefined,
this.config.debug,
);
this.cohortUpdater = new CohortPoller(
cohortFetcher,
this.cohortStorage,
this.cache,
60000,
Math.max(
COHORT_POLLING_INTERVAL_MILLIS_MIN,
this.config.cohortSyncConfig?.cohortPollingIntervalMillis,
),
this.config.debug,
);
}
Expand Down
11 changes: 11 additions & 0 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export interface CohortApi {

export class CohortMaxSizeExceededError extends Error {}

export class CohortClientRequestError extends Error {}

export class CohortDownloadError extends Error {}

export class SdkCohortApi implements CohortApi {
Expand Down Expand Up @@ -79,6 +81,15 @@ export class SdkCohortApi implements CohortApi {
throw new CohortMaxSizeExceededError(
`Cohort size > ${options.maxCohortSize}`,
);
} else if (
400 <= response.status &&
response.status < 500 &&
response.status != 429
) {
// Any 400 other than 429.
throw new CohortClientRequestError(
`Cohort client error response status ${response.status}, body ${response.body}`,
);
} else {
throw new CohortDownloadError(
`Cohort error response status ${response.status}, body ${response.body}`,
Expand Down
18 changes: 13 additions & 5 deletions packages/node/src/local/cohort/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { version as PACKAGE_VERSION } from '../../../gen/version';
import { WrapperClient } from '../../transport/http';
import { Cohort } from '../../types/cohort';
import { CohortConfigDefaults } from '../../types/config';
import { CohortSyncConfigDefaults } from '../../types/config';
import { HttpClient } from '../../types/transport';
import { ConsoleLogger, Logger } from '../../util/logger';
import { Mutex, Executor } from '../../util/threading';
import { sleep } from '../../util/time';

import { CohortMaxSizeExceededError, SdkCohortApi } from './cohort-api';
import {
CohortClientRequestError,
CohortMaxSizeExceededError,
SdkCohortApi,
} from './cohort-api';

export const COHORT_CONFIG_TIMEOUT = 20000;

Expand All @@ -31,8 +35,8 @@ export class CohortFetcher {
apiKey: string,
secretKey: string,
httpClient: HttpClient,
serverUrl = CohortConfigDefaults.cohortServerUrl,
maxCohortSize = CohortConfigDefaults.maxCohortSize,
serverUrl = CohortSyncConfigDefaults.cohortServerUrl,
maxCohortSize = CohortSyncConfigDefaults.maxCohortSize,
cohortRequestDelayMillis = 100,
debug = false,
) {
Expand Down Expand Up @@ -78,7 +82,11 @@ export class CohortFetcher {
this.logger.debug('Stop downloading', cohortId);
return cohort;
} catch (e) {
if (i === ATTEMPTS - 1 || e instanceof CohortMaxSizeExceededError) {
if (
i === ATTEMPTS - 1 ||
e instanceof CohortMaxSizeExceededError ||
e instanceof CohortClientRequestError
) {
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[key];
unlock();
Expand Down
20 changes: 14 additions & 6 deletions packages/node/src/types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export type LocalEvaluationConfig = {
/**
* Select the Amplitude data center to get flags and variants from, `us` or `eu`.
*/
serverZone?: string;
serverZone?: 'us' | 'eu';

/**
* The server endpoint from which to request flags. For hitting the EU data center, use serverZone.
Expand Down Expand Up @@ -184,7 +184,7 @@ export type LocalEvaluationConfig = {
*/
streamFlagConnTimeoutMillis?: number;

cohortConfig?: CohortConfig;
cohortSyncConfig?: CohortSyncConfig;
};

export type AssignmentConfig = {
Expand All @@ -200,7 +200,7 @@ export type AssignmentConfig = {
cacheCapacity?: number;
} & NodeOptions;

export type CohortConfig = {
export type CohortSyncConfig = {
apiKey: string;
secretKey: string;

Expand All @@ -216,7 +216,15 @@ export type CohortConfig = {
*/
maxCohortSize?: number;

cohortRequestDelayMillis?: number;
/**
* The interval in milliseconds to poll the amplitude server for cohort
* updates. These cohorts stored in memory and used when calling evaluate() to
* perform local evaluation.
*
* Default: 60000 (60 seconds)
* Minimum: 60000
*/
cohortPollingIntervalMillis?: number;
};

/**
Expand Down Expand Up @@ -247,11 +255,11 @@ export const AssignmentConfigDefaults: Omit<AssignmentConfig, 'apiKey'> = {
cacheCapacity: 65536,
};

export const CohortConfigDefaults: Omit<CohortConfig, 'apiKey' | 'secretKey'> =
export const CohortSyncConfigDefaults: Omit<CohortSyncConfig, 'apiKey' | 'secretKey'> =
{
cohortServerUrl: 'https://cohort-v2.lab.amplitude.com',
maxCohortSize: 2147483647,
cohortRequestDelayMillis: 100,
cohortPollingIntervalMillis: 60000,
};

export const EU_SERVER_URLS = {
Expand Down
12 changes: 7 additions & 5 deletions packages/node/src/util/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import {
import {
EU_SERVER_URLS,
LocalEvaluationDefaults,
CohortConfigDefaults,
CohortSyncConfigDefaults,
} from '../types/config';

export const populateRemoteConfigDefaults = (
customConfig?: RemoteEvaluationConfig,
): RemoteEvaluationConfig => {
const config = { ...RemoteEvaluationDefaults, ...customConfig };
const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name;
config.serverZone = isEu ? 'eu' : 'us';

if (!customConfig?.serverUrl) {
config.serverUrl = isEu
Expand All @@ -28,6 +29,7 @@ export const populateLocalConfigDefaults = (
): LocalEvaluationConfig => {
const config = { ...LocalEvaluationDefaults, ...customConfig };
const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name;
config.serverZone = isEu ? 'eu' : 'us';

if (!customConfig?.serverUrl) {
config.serverUrl = isEu
Expand All @@ -40,12 +42,12 @@ export const populateLocalConfigDefaults = (
: LocalEvaluationDefaults.streamServerUrl;
}
if (
customConfig?.cohortConfig &&
!customConfig?.cohortConfig.cohortServerUrl
customConfig?.cohortSyncConfig &&
!customConfig?.cohortSyncConfig.cohortServerUrl
) {
config.cohortConfig.cohortServerUrl = isEu
config.cohortSyncConfig.cohortServerUrl = isEu
? EU_SERVER_URLS.cohort
: CohortConfigDefaults.cohortServerUrl;
: CohortSyncConfigDefaults.cohortServerUrl;
}
return config;
};
4 changes: 2 additions & 2 deletions packages/node/test/local/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ if (!process.env['API_KEY'] && !process.env['SECRET_KEY']) {
);
}

const cohortConfig = {
const cohortSyncConfig = {
apiKey: process.env['API_KEY'],
secretKey: process.env['SECRET_KEY'],
};

const client = Experiment.initializeLocal(apiKey, {
debug: false,
cohortConfig: cohortConfig,
cohortSyncConfig: cohortSyncConfig,
});

beforeAll(async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/node/test/local/client.eu.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const apiKey = 'server-Qlp7XiSu6JtP2S3JzA95PnP27duZgQCF';

const client = Experiment.initializeLocal(apiKey, {
serverZone: 'eu',
cohortConfig: {
cohortSyncConfig: {
apiKey: process.env['EU_API_KEY'],
secretKey: process.env['EU_SECRET_KEY'],
},
Expand Down
10 changes: 5 additions & 5 deletions packages/node/test/local/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ const setupEvaluateCohortTestErrorClientCases = (
describe('ExperimentClient end-to-end tests, normal cases', () => {
describe('Normal cases', () => {
const client = Experiment.initializeLocal(apiKey, {
cohortConfig: {
cohortSyncConfig: {
apiKey: process.env['API_KEY'],
secretKey: process.env['SECRET_KEY'],
},
Expand Down Expand Up @@ -302,7 +302,7 @@ describe('ExperimentClient end-to-end tests, normal cases', () => {

describe('Bad cohort config', () => {
const client = Experiment.initializeLocal(apiKey, {
cohortConfig: {
cohortSyncConfig: {
apiKey: 'bad_api_key',
secretKey: 'bad_secret_key',
},
Expand Down Expand Up @@ -368,7 +368,7 @@ describe('ExperimentClient integration tests', () => {
const client = new LocalEvaluationClient(
'apikey',
{
cohortConfig: {
cohortSyncConfig: {
apiKey: 'apiKey',
secretKey: 'secretKey',
maxCohortSize: 10,
Expand Down Expand Up @@ -444,7 +444,7 @@ describe('ExperimentClient integration tests', () => {
const client = new LocalEvaluationClient(
'apikey',
{
cohortConfig: {
cohortSyncConfig: {
apiKey: 'apiKey',
secretKey: 'secretKey',
maxCohortSize: 0,
Expand Down Expand Up @@ -472,7 +472,7 @@ describe('ExperimentClient integration tests', () => {
'apikey',
{
flagConfigPollingIntervalMillis: 40000,
cohortConfig: {
cohortSyncConfig: {
apiKey: 'apiKey',
secretKey: 'secretKey',
maxCohortSize: 10,
Expand Down
4 changes: 2 additions & 2 deletions packages/node/test/local/cohort/cohortFetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
SdkCohortApi,
} from 'src/local/cohort/cohort-api';
import { COHORT_CONFIG_TIMEOUT, CohortFetcher } from 'src/local/cohort/fetcher';
import { CohortConfigDefaults } from 'src/types/config';
import { CohortSyncConfigDefaults } from 'src/types/config';
import { sleep } from 'src/util/time';

import { version as PACKAGE_VERSION } from '../../../gen/version';
Expand Down Expand Up @@ -58,7 +58,7 @@ test('cohort fetch success', async () => {
lastModified: undefined,
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
maxCohortSize: CohortConfigDefaults.maxCohortSize,
maxCohortSize: CohortSyncConfigDefaults.maxCohortSize,
timeoutMillis: COHORT_CONFIG_TIMEOUT,
});
});
Expand Down
18 changes: 9 additions & 9 deletions packages/node/test/util/config.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LocalEvaluationConfig } from 'src/index';
import {
LocalEvaluationDefaults,
CohortConfigDefaults,
CohortSyncConfigDefaults,
EU_SERVER_URLS,
RemoteEvaluationConfig,
} from 'src/types/config';
Expand All @@ -17,12 +17,12 @@ test.each([
'us',
LocalEvaluationDefaults.serverUrl,
LocalEvaluationDefaults.streamServerUrl,
CohortConfigDefaults.cohortServerUrl,
CohortSyncConfigDefaults.cohortServerUrl,
],
],
[
{ zone: 'EU' },
['EU', EU_SERVER_URLS.flags, EU_SERVER_URLS.stream, EU_SERVER_URLS.cohort],
['eu', EU_SERVER_URLS.flags, EU_SERVER_URLS.stream, EU_SERVER_URLS.cohort],
],
[
{ url: 'urlurl', stream: 'streamurl', cohort: 'cohorturl' },
Expand All @@ -38,13 +38,13 @@ test.each([
],
])("'%s'", (testcase, expected) => {
const config: LocalEvaluationConfig = {
cohortConfig: {
cohortSyncConfig: {
apiKey: '',
secretKey: '',
},
};
if ('zone' in testcase) {
config.serverZone = testcase.zone;
config.serverZone = testcase.zone as never;
}
if ('url' in testcase) {
config.serverUrl = testcase.url;
Expand All @@ -53,25 +53,25 @@ test.each([
config.streamServerUrl = testcase.stream;
}
if ('cohort' in testcase) {
config.cohortConfig.cohortServerUrl = testcase.cohort;
config.cohortSyncConfig.cohortServerUrl = testcase.cohort;
}
const newConfig = populateLocalConfigDefaults(config);
expect(newConfig.serverZone).toBe(expected[0]);
expect(newConfig.serverUrl).toBe(expected[1]);
expect(newConfig.streamServerUrl).toBe(expected[2]);
expect(newConfig.cohortConfig.cohortServerUrl).toBe(expected[3]);
expect(newConfig.cohortSyncConfig.cohortServerUrl).toBe(expected[3]);
});

test.each([
[{}, 'us', LocalEvaluationDefaults.serverUrl],
[{ zone: 'EU' }, 'EU', EU_SERVER_URLS.remote],
[{ zone: 'EU' }, 'eu', EU_SERVER_URLS.remote],
[{ url: 'urlurl' }, 'us', 'urlurl'],
[{ zone: 'eu', url: 'urlurl' }, 'eu', 'urlurl'],
[{ zone: 'eu', url: 'urlurl' }, 'eu', 'urlurl'],
])("'%s'", (testcase, expectedZone, expectedUrl) => {
const config: RemoteEvaluationConfig = {};
if ('zone' in testcase) {
config.serverZone = testcase.zone;
config.serverZone = testcase.zone as never;
}
if ('url' in testcase) {
config.serverUrl = testcase.url;
Expand Down

0 comments on commit cb05834

Please sign in to comment.