Skip to content

Commit

Permalink
fix client start and stop, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Jul 9, 2024
1 parent b5f441f commit 85bcf02
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 134 deletions.
8 changes: 5 additions & 3 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class LocalEvaluationClient {
this.config.cohortConfig?.maxCohortSize,
this.config.debug,
);
new CohortPoller(
this.cohortUpdater = new CohortPoller(
cohortFetcher,
this.cohortStorage,
60000, // this.config.cohortConfig?.cohortPollingIntervalMillis,
Expand Down Expand Up @@ -263,7 +263,8 @@ export class LocalEvaluationClient {
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.updater.start();
await this.updater.start();
await this.cohortUpdater.start();
}

/**
Expand All @@ -272,6 +273,7 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.updater.stop();
this.updater.stop();
this.cohortUpdater.stop();
}
}
61 changes: 10 additions & 51 deletions packages/node/src/local/cohort/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import { Cohort } from 'src/types/cohort';
import { CohortConfigDefaults } from 'src/types/config';
import { HttpClient } from 'src/types/transport';
import { BackoffPolicy, doWithBackoffFailLoudly } from 'src/util/backoff';
import { Mutex, Executor } from 'src/util/mutex';
import { ConsoleLogger, Logger } from 'src/util/logger';
import { Mutex, Executor } from 'src/util/threading';

import { version as PACKAGE_VERSION } from '../../../gen/version';

import { SdkCohortApi } from './cohort-api';

const COHORT_CONFIG_TIMEOUT = 20000;
export const COHORT_CONFIG_TIMEOUT = 20000;

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 3,
Expand All @@ -19,9 +20,10 @@ const BACKOFF_POLICY: BackoffPolicy = {
};

export class CohortFetcher {
private readonly logger: Logger;

readonly cohortApi: SdkCohortApi;
readonly maxCohortSize: number;
readonly debug: boolean;

private readonly inProgressCohorts: Record<
string,
Expand All @@ -44,20 +46,19 @@ export class CohortFetcher {
new WrapperClient(httpClient),
);
this.maxCohortSize = maxCohortSize;
this.debug = debug;
this.logger = new ConsoleLogger(debug);
}

async fetch(
cohortId: string,
lastModified?: number,
): Promise<Cohort | undefined> {
// This block may have async and awaits. No guarantee that executions are not interleaved.
// TODO: Add download concurrency limit.
const unlock = await this.mutex.lock();

if (!this.inProgressCohorts[cohortId]) {
this.inProgressCohorts[cohortId] = this.executor.run(async () => {
console.log('Start downloading', cohortId);
this.logger.debug('Start downloading', cohortId);
const cohort = await doWithBackoffFailLoudly<Cohort>(
async () =>
this.cohortApi.getCohort({
Expand All @@ -82,55 +83,13 @@ export class CohortFetcher {
unlock();
throw err;
});
console.log('Stop downloading', cohortId, cohort['cohortId']);
this.logger.debug('Stop downloading', cohortId, cohort['cohortId']);
return cohort;
});
}

const cohortPromise = this.inProgressCohorts[cohortId];
unlock();
return this.inProgressCohorts[cohortId];
return cohortPromise;
}

// queueMutex = new Mutex();
// queue = [];
// running = 0;

// private startNextTask() {
// const unlock = this.queueMutex.lock();
// if (this.running >= 10) {
// unlock();
// return;
// }

// const nextTask = this.queue[0];
// delete this.queue[0];

// this.running++;
// new Promise((resolve, reject) => {
// nextTask()
// .then((v) => {
// const unlock = this.queueMutex.lock();
// this.running--;
// unlock();
// this.startNextTask();
// return v;
// })
// .catch((err) => {
// const unlock = this.queueMutex.lock();
// this.running--;
// unlock();
// this.startNextTask();
// throw err;
// });
// });

// unlock();
// }

// private queueTask<T>(task: () => Promise<T>): Promise<T> {
// const unlock = this.queueMutex.lock();
// this.queue.push(task);
// unlock();
// this.startNextTask();
// }
}
1 change: 1 addition & 0 deletions packages/node/src/local/cohort/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class CohortPoller implements CohortUpdater {
this.pollingIntervalMillis = pollingIntervalMillis;
this.logger = new ConsoleLogger(debug);
}

/**
* You must call this function to begin polling for flag config updates.
* The promise returned by this function is resolved when the initial call
Expand Down
11 changes: 2 additions & 9 deletions packages/node/src/local/cohort/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@ export class InMemoryCohortStorage implements CohortStorage {
this.store[cohort.cohortId] = cohort;
}

putAll(cohorts: Record<string, Cohort>): void {
// Assignments are atomic.
this.store = { ...this.store, ...cohorts };
}

removeAll(cohortIds: Set<string>): void {
cohortIds.forEach((id) => {
delete this.store[id];
});
delete(cohortId: string): void {
delete this.store[cohortId];
}
}
4 changes: 4 additions & 0 deletions packages/node/src/local/cohort/updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ export interface CohortUpdater {
* @throws error if update failed.
*/
update(onChange?: (storage: CohortStorage) => Promise<void>): Promise<void>;

start(onChange?: (storage: CohortStorage) => Promise<void>): Promise<void>;

stop(): void;
}
10 changes: 6 additions & 4 deletions packages/node/src/local/updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ export class FlagConfigUpdaterBase {
const newFlagConfigs = {};
for (const flagKey in flagConfigs) {
// Get cohorts for this flag.
const cohortIdsForFlag = CohortUtils.extractCohortIdsFromFlag(
const cohortIds = CohortUtils.extractCohortIdsFromFlag(
flagConfigs[flagKey],
);

// Check if all cohorts for this flag has downloaded.
// If any cohort failed, don't use the new flag.
const updateFlag =
cohortIdsForFlag.size === 0 ||
[...cohortIdsForFlag]
cohortIds.size === 0 ||
[...cohortIds]
.map((id) => this.cohortStorage.getCohort(id))
.reduce((acc, cur) => acc && cur);

Expand All @@ -165,7 +165,9 @@ export class FlagConfigUpdaterBase {
this.cohortStorage.getAllCohortIds(),
validCohortIds,
);
this.cohortStorage.removeAll(cohortIdsToBeRemoved);
cohortIdsToBeRemoved.forEach((id) => {
this.cohortStorage.delete(id);
});
}

private static setSubtract(one: Set<string>, other: Set<string>) {
Expand Down
3 changes: 1 addition & 2 deletions packages/node/src/types/cohort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ export interface CohortStorage {
cohortIds: Set<string>,
): Set<string>;
put(cohort: Cohort): void;
putAll(cohorts: Record<string, Cohort>): void;
removeAll(cohortIds: Set<string>): void;
delete(cohortIds: string): void;
}

export const USER_GROUP_TYPE = 'User';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// https://news.ycombinator.com/item?id=11823816

export class Mutex {
// https://news.ycombinator.com/item?id=11823816

_locking;

constructor() {
this._locking = Promise.resolve();
}

lock() {
lock(): Promise<() => void> {
let unlockNext;
const willLock = new Promise((resolve) => (unlockNext = resolve));
const willUnlock = this._locking.then(() => unlockNext);
Expand Down Expand Up @@ -40,7 +40,7 @@ export class Semaphore {
return promise;
}

tryRunNext(): void {
private tryRunNext(): void {
if (this.running >= this.limit || this.queue.length == 0) {
return;
}
Expand Down
36 changes: 17 additions & 19 deletions packages/node/test/local/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,25 +265,23 @@ test('ExperimentClient.enrichUserWithCohorts', async () => {
LocalEvaluationDefaults,
new InMemoryFlagConfigCache(),
);
client.cohortStorage.putAll({
cohort1: {
cohortId: 'cohort1',
groupType: USER_GROUP_TYPE,
groupTypeId: 0,
lastComputed: 0,
lastModified: 0,
size: 1,
memberIds: new Set<string>(['userId']),
},
groupcohort1: {
cohortId: 'groupcohort1',
groupType: 'groupname',
groupTypeId: 1,
lastComputed: 0,
lastModified: 0,
size: 1,
memberIds: new Set<string>(['amplitude', 'experiment']),
},
client.cohortStorage.put({
cohortId: 'cohort1',
groupType: USER_GROUP_TYPE,
groupTypeId: 0,
lastComputed: 0,
lastModified: 0,
size: 1,
memberIds: new Set<string>(['userId']),
});
client.cohortStorage.put({
cohortId: 'groupcohort1',
groupType: 'groupname',
groupTypeId: 1,
lastComputed: 0,
lastModified: 0,
size: 1,
memberIds: new Set<string>(['amplitude', 'experiment']),
});
const user = {
user_id: 'userId',
Expand Down
20 changes: 10 additions & 10 deletions packages/node/test/local/cohort/cohortApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ test('getCohort success', async () => {
body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }),
};
});
const fetcher = new SdkCohortApi(
const api = new SdkCohortApi(
encodedKey,
serverUrl,
new WrapperClient(httpClient),
);
const cohort = await fetcher.getCohort({
const cohort = await api.getCohort({
cohortId,
maxCohortSize,
libraryName: 'experiment-node-server',
Expand All @@ -142,13 +142,13 @@ test('getCohort 413', async () => {
expect(params.headers).toStrictEqual(expectedHeaders);
return { status: 413, body: '' };
});
const fetcher = new SdkCohortApi(
const api = new SdkCohortApi(
encodedKey,
serverUrl,
new WrapperClient(httpClient),
);
await expect(
fetcher.getCohort({
api.getCohort({
cohortId,
maxCohortSize,
libraryName: 'experiment-node-server',
Expand All @@ -167,13 +167,13 @@ test('getCohort no modification 204', async () => {
expect(params.headers).toStrictEqual(expectedHeaders);
return { status: 204, body: '' };
});
const fetcher = new SdkCohortApi(
const api = new SdkCohortApi(
encodedKey,
serverUrl,
new WrapperClient(httpClient),
);
expect(
await fetcher.getCohort({
await api.getCohort({
cohortId,
maxCohortSize,
lastModified,
Expand All @@ -196,13 +196,13 @@ test('getCohort no modification but still return cohort due to cache miss', asyn
body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }),
};
});
const fetcher = new SdkCohortApi(
const api = new SdkCohortApi(
encodedKey,
serverUrl,
new WrapperClient(httpClient),
);
expect(
await fetcher.getCohort({
await api.getCohort({
cohortId,
maxCohortSize,
lastModified,
Expand All @@ -225,13 +225,13 @@ test('getCohort other errors', async () => {
body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }),
};
});
const fetcher = new SdkCohortApi(
const api = new SdkCohortApi(
encodedKey,
serverUrl,
new WrapperClient(httpClient),
);
await expect(
fetcher.getCohort({
api.getCohort({
cohortId,
maxCohortSize,
lastModified,
Expand Down
Loading

0 comments on commit 85bcf02

Please sign in to comment.