diff --git a/common/lib/plugins/failover/failover_plugin.ts b/common/lib/plugins/failover/failover_plugin.ts index 7e38ed74..ced0cc6e 100644 --- a/common/lib/plugins/failover/failover_plugin.ts +++ b/common/lib/plugins/failover/failover_plugin.ts @@ -26,7 +26,7 @@ import { PluginService } from "../../plugin_service"; import { ConnectionPlugin } from "../../connection_plugin"; import { HostListProviderService } from "../../host_list_provider_service"; import { ClusterAwareReaderFailoverHandler } from "./reader_failover_handler"; -import { SubscribedMethodHelper } from "../../utils/subsribed_method_helper"; +import { SubscribedMethodHelper } from "../../utils/subscribed_method_helper"; import { HostChangeOptions } from "../../host_change_options"; import { ClusterAwareWriterFailoverHandler } from "./writer_failover_handler"; diff --git a/common/lib/utils/map_utils.ts b/common/lib/utils/map_utils.ts new file mode 100644 index 00000000..41b32315 --- /dev/null +++ b/common/lib/utils/map_utils.ts @@ -0,0 +1,111 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +export class MapUtils { + protected map: Map = new Map(); + + get size(): number { + return this.map.size; + } + + get keys() { + return this.map.keys(); + } + + get entries() { + return this.map.entries(); + } + + get(key: K): V | undefined { + return this.map.get(key); + } + + clear() { + this.map.clear(); + } + + computeIfPresent(key: K, remappingFunc: (key: K, existingValue: V) => V | null): V | undefined { + const existingValue: V | undefined = this.map.get(key); + if (existingValue === undefined) { + return undefined; + } + const newValue: any = remappingFunc(key, existingValue); + if (newValue !== null) { + this.map.set(key, newValue); + return newValue; + } else { + this.map.delete(key); + return undefined; + } + } + + computeIfAbsent(key: K, mappingFunc: (key: K) => V | null): V | undefined { + const value: V | undefined = this.map.get(key); + if (value == undefined) { + const newValue: V | null = mappingFunc(key); + if (newValue !== null) { + this.map.set(key, newValue); + return newValue; + } + return undefined; + } + return value; + } + + putIfAbsent(key: K, newValue: V): V | undefined { + const existingValue: V | undefined = this.map.get(key); + if (existingValue === undefined) { + this.map.set(key, newValue); + return newValue; + } + return existingValue; + } + + remove(key: K): V | undefined { + const value = this.map.get(key); + this.map.delete(key); + return value; + } + + removeIf(predicate: (v: any, k: any) => V): boolean { + const originalSize = this.size; + this.map.forEach((v, k) => { + if (predicate(v, k)) { + this.remove(k); + } + }); + return this.size < originalSize; + } + + removeMatchingValues(removalValues: any[]): boolean { + const originalSize = this.size; + this.map.forEach((v, k) => { + if (removalValues.includes(v)) { + this.remove(k); + } + }); + return this.size < originalSize; + } + + applyIf(predicate: (v: any, k: any) => V, apply: (v: any, k: any) => V): void { + const originalSize = this.size; + this.map.forEach((v, k) => { + if (predicate(v, k)) { + apply(v, k); + } + }); + } +} diff --git a/common/lib/utils/sliding_expiration_cache.ts b/common/lib/utils/sliding_expiration_cache.ts new file mode 100644 index 00000000..cf57e0fb --- /dev/null +++ b/common/lib/utils/sliding_expiration_cache.ts @@ -0,0 +1,122 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MapUtils } from "./map_utils"; +import { getTimeInNanos } from "aws-wrapper-common-lib/lib/utils/utils"; + +class CacheItem { + private _item: V; + private _expirationTimeNanos: bigint; + + constructor(item: V, expirationTimeNanos: bigint) { + this._item = item; + this._expirationTimeNanos = expirationTimeNanos; + } + + get item(): V { + return this._item; + } + + get expirationTimeNs(): bigint { + return this._expirationTimeNanos; + } + + updateExpiration(expirationIntervalNanos: bigint): CacheItem { + this._expirationTimeNanos = getTimeInNanos() + expirationIntervalNanos; + return this; + } +} + +export class SlidingExpirationCache { + private _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes + private readonly _shouldDisposeFunc?: (item: V) => boolean; + private readonly _itemDisposalFunc?: (item: V) => void; + private _map: MapUtils> = new MapUtils>(); + private _cleanupTimeNanos: bigint; + + constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, itemDisposalFunc?: (item: V) => void) { + this._cleanupIntervalNanos = cleanupIntervalNanos; + this._shouldDisposeFunc = shouldDisposeFunc; + this._itemDisposalFunc = itemDisposalFunc; + this._cleanupTimeNanos = getTimeInNanos() + this._cleanupIntervalNanos; + } + + get size(): number { + return this._map.size; + } + + set cleanupIntervalNs(value: bigint) { + this._cleanupIntervalNanos = value; + } + + computeIfAbsent(key: K, mappingFunc: (key: K) => V, itemExpirationNanos: bigint): V | null { + this.cleanUp(); + const cacheItem = this._map.computeIfAbsent(key, (k) => new CacheItem(mappingFunc(k), getTimeInNanos() + itemExpirationNanos)); + return cacheItem?.updateExpiration(itemExpirationNanos).item ?? null; + } + + get(key: K): V | undefined { + this.cleanUp(); + const cacheItem = this._map.get(key); + return cacheItem?.item ?? undefined; + } + + remove(key: K): void { + this.removeAndDispose(key); + this.cleanUp(); + } + + removeAndDispose(key: K): void { + const cacheItem = this._map.remove(key); + if (cacheItem != null && this._itemDisposalFunc != null) { + this._itemDisposalFunc(cacheItem.item); + } + } + + removeIfExpired(key: K): void { + const cacheItem = this._map.get(key); + if (cacheItem == null || this.shouldCleanupItem(cacheItem)) { + this.removeAndDispose(key); + } + } + + shouldCleanupItem(cacheItem: CacheItem): boolean { + if (this._shouldDisposeFunc != null) { + return getTimeInNanos() > cacheItem.expirationTimeNs && this._shouldDisposeFunc(cacheItem.item); + } + return getTimeInNanos() > cacheItem.expirationTimeNs; + } + + clear(): void { + for (const [key, val] of this._map.entries) { + if (val !== undefined && this._itemDisposalFunc !== undefined) { + this._itemDisposalFunc(val.item); + } + } + this._map.clear(); + } + + protected cleanUp() { + const currentTime = getTimeInNanos(); + if (this._cleanupTimeNanos > currentTime) { + return; + } + this._cleanupTimeNanos = currentTime + this._cleanupIntervalNanos; + for (const k of this._map.keys) { + this.removeIfExpired(k); + } + } +} diff --git a/common/lib/utils/subsribed_method_helper.ts b/common/lib/utils/subscribed_method_helper.ts similarity index 100% rename from common/lib/utils/subsribed_method_helper.ts rename to common/lib/utils/subscribed_method_helper.ts diff --git a/common/lib/utils/utils.ts b/common/lib/utils/utils.ts index c215e725..553fcdcf 100644 --- a/common/lib/utils/utils.ts +++ b/common/lib/utils/utils.ts @@ -38,7 +38,7 @@ export function logTopology(hosts: HostInfo[], msgPrefix: string) { } export function getTimeInNanos() { - return performance.now(); + return process.hrtime.bigint(); } export function maskProperties(props: Map) { diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 2df6e969..ad45bffa 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -98,7 +98,12 @@ export class WrapperProperties { static removeWrapperProperties(config: T): T { const copy = Object.assign({}, config); - const persistingProperties = [WrapperProperties.USER.name, WrapperProperties.PASSWORD.name, WrapperProperties.DATABASE.name, WrapperProperties.PORT.name]; + const persistingProperties = [ + WrapperProperties.USER.name, + WrapperProperties.PASSWORD.name, + WrapperProperties.DATABASE.name, + WrapperProperties.PORT.name + ]; Object.values(WrapperProperties).forEach((prop) => { if (prop instanceof WrapperProperty) { diff --git a/mysql/lib/client.ts b/mysql/lib/client.ts index 921a4ff3..1a709b46 100644 --- a/mysql/lib/client.ts +++ b/mysql/lib/client.ts @@ -43,7 +43,7 @@ export class AwsMySQLClient extends AwsClient { async connect(): Promise { await this.internalConnect(); - let conn: Promise = this.pluginManager.connect(this.pluginService.getCurrentHostInfo(), this.properties, true, async () => { + const conn: Promise = this.pluginManager.connect(this.pluginService.getCurrentHostInfo(), this.properties, true, async () => { return this.targetClient.promise().connect(); }); this.isConnected = true; diff --git a/pg/lib/client.ts b/pg/lib/client.ts index 1a4afc6c..f4eb868b 100644 --- a/pg/lib/client.ts +++ b/pg/lib/client.ts @@ -39,7 +39,7 @@ export class AwsPGClient extends AwsClient { async connect(): Promise { await this.internalConnect(); - let res: Promise = this.pluginManager.connect(this.pluginService.getCurrentHostInfo(), this.properties, true, () => + const res: Promise = this.pluginManager.connect(this.pluginService.getCurrentHostInfo(), this.properties, true, () => this.targetClient.connect() ); this.isConnected = true; diff --git a/tests/unit/map_utils.test.ts b/tests/unit/map_utils.test.ts new file mode 100644 index 00000000..302079d6 --- /dev/null +++ b/tests/unit/map_utils.test.ts @@ -0,0 +1,159 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MapUtils as MapUtils } from "aws-wrapper-common-lib/lib/utils/map_utils"; +import { spy, verify } from "ts-mockito"; + +class SomeClass { + someMethod() {} +} + +describe("test_map", () => { + it.each([ + ["a", 2], + ["b", 2] + ])("test_put_if_absent", (key, val) => { + const target = new MapUtils(); + target.putIfAbsent(key, val); + expect(target.size).toEqual(1); + expect(target.get(key)).toEqual(val); + }); + it.each([ + ["a", () => undefined, undefined], + ["b", () => 3, 3] + ])("test_compute_if_absent", (key, val, res) => { + const target = new MapUtils(); + target.computeIfAbsent(key, val); + expect(target.get(key)).toEqual(res); + }); + it.each([1])("test_compute_if_absent", (key) => { + const target = new MapUtils(); + target.computeIfAbsent(key, () => undefined); + expect(target.get(key)).toEqual(undefined); + + target.computeIfAbsent(key, () => "a"); + expect(target.get(key)).toEqual("a"); + + target.computeIfAbsent(key, () => "b"); + expect(target.get(key)).toEqual("a"); + }); + it.each([1])("test_compute_if_present", (key) => { + const target = new MapUtils(); + target.computeIfPresent(key, () => "a"); + expect(target.get(key)).toEqual(undefined); + + target.putIfAbsent(key, "a"); + expect(target.get(key)).toEqual("a"); + target.computeIfPresent(1, () => undefined); + expect(target.get(key)).toEqual(undefined); + + target.putIfAbsent(key, "a"); + expect(target.get(key)).toEqual("a"); + target.computeIfPresent(key, () => "b"); + expect(target.get(key)).toEqual("b"); + }); + it("test_clear", () => { + const target = new MapUtils(); + target.putIfAbsent(1, "a"); + target.putIfAbsent(2, "b"); + expect(target.get(1)).toEqual("a"); + expect(target.get(2)).toEqual("b"); + + target.clear(); + expect(target.get(1)).toEqual(undefined); + expect(target.get(2)).toEqual(undefined); + }); + it("test_remove", () => { + const target = new MapUtils(); + target.putIfAbsent(1, "a"); + target.putIfAbsent(2, "b"); + expect(target.get(1)).toEqual("a"); + expect(target.get(2)).toEqual("b"); + + target.remove(1); + expect(target.get(1)).toEqual(undefined); + expect(target.get(2)).toEqual("b"); + }); + it("test_remove_if", () => { + const target = new MapUtils(); + target.putIfAbsent(1, [1, 2]); + target.putIfAbsent(2, [2, 3]); + target.putIfAbsent(3, [4, 5]); + expect(target.size).toEqual(3); + + expect(target.removeIf((v, k) => v.includes(2))).toBeTruthy(); + expect(target.size).toEqual(1); + expect(target.get(1)).toEqual(undefined); + expect(target.get(2)).toEqual(undefined); + expect(target.get(3)).toEqual([4, 5]); + + expect(target.removeIf((v, k) => v.includes(3))).toBeFalsy(); + expect(target.size).toEqual(1); + expect(target.get(3)).toEqual([4, 5]); + }); + it("test_remove_matching_values", () => { + const target = new MapUtils(); + target.putIfAbsent(1, "a"); + target.putIfAbsent(2, "b"); + target.putIfAbsent(3, "c"); + + expect(target.removeMatchingValues(["a", "b"])).toBeTruthy(); + expect(target.size).toEqual(1); + expect(target.get(1)).toEqual(undefined); + expect(target.get(2)).toEqual(undefined); + expect(target.get(3)).toEqual("c"); + expect(target.removeIf((v, k) => v.includes(3))).toBeFalsy(); + expect(target.size).toEqual(1); + expect(target.get(3)).toEqual("c"); + }); + it("test_apply_if", () => { + const target = new MapUtils(); + const spies = []; + const numObjects = 3; + const numApplications = numObjects - 1; + for (let i = 0; i < numObjects; i++) { + const someObject: SomeClass = new SomeClass(); + target.putIfAbsent(i, someObject); + const spiedObject = spy(someObject); + spies.push(spiedObject); + } + + target.applyIf( + (v, k) => k < numObjects - 1, + (v, k) => v.someMethod() + ); + + for (let i = 0; i < numApplications; i++) { + verify(spies[i].someMethod()).once(); + } + }); +}); diff --git a/tests/unit/sliding_expiration_cache.test.ts b/tests/unit/sliding_expiration_cache.test.ts new file mode 100644 index 00000000..15686df6 --- /dev/null +++ b/tests/unit/sliding_expiration_cache.test.ts @@ -0,0 +1,112 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { SlidingExpirationCache } from "aws-wrapper-common-lib/lib/utils/sliding_expiration_cache"; +import { sleep } from "aws-wrapper-common-lib/lib/utils/utils"; + +class DisposableItem { + shouldDispose: boolean; + disposed: boolean; + constructor(shouldDispose: boolean) { + this.shouldDispose = shouldDispose; + this.disposed = false; + } + + dispose() { + this.disposed = true; + } +} + +describe("test_sliding_expiration_cache", () => { + it("test_compute_if_absent", async () => { + const target = new SlidingExpirationCache(BigInt(50_000_000)); + const result1 = target.computeIfAbsent(1, () => "a", BigInt(1)); + const originalItemExpiration = target._map.get(1)!.expirationTimeNs; + const result2 = target.computeIfAbsent(1, () => "b", BigInt(5)); + const updatedItemExpiration = target._map.get(1)?.expirationTimeNs; + + expect(updatedItemExpiration).toBeGreaterThan(originalItemExpiration); + expect(result1).toEqual("a"); + expect(result2).toEqual("a"); + expect(target.get(1)).toEqual("a"); + + await sleep(700); + const result3 = target.computeIfAbsent(1, () => "b", BigInt(5)); + expect(result3).toEqual("b"); + expect(target.get(1)).toEqual("b"); + }); + it("test_remove", async () => { + const target = new SlidingExpirationCache( + BigInt(50_000_000), + (item: DisposableItem) => item.shouldDispose, + (item) => item.dispose() + ); + const itemToRemove = new DisposableItem(true); + let result = target.computeIfAbsent("itemToRemove", () => itemToRemove, BigInt(15_000_000_000)); + expect(itemToRemove).toEqual(result); + + const itemToCleanup = new DisposableItem(true); + result = target.computeIfAbsent("itemToCleanup", () => itemToCleanup, BigInt(1)); + expect(itemToCleanup).toEqual(result); + + const nonDisposableItem = new DisposableItem(false); + result = target.computeIfAbsent("nonDisposableItem", () => nonDisposableItem, BigInt(1)); + expect(nonDisposableItem).toEqual(result); + + const nonExpiredItem = new DisposableItem(true); + result = target.computeIfAbsent("nonExpiredItem", () => nonExpiredItem, BigInt(15_000_000_000)); + expect(nonExpiredItem).toEqual(result); + + await sleep(700); + target.remove("itemToRemove"); + + expect(target.get("itemToRemove")).toEqual(undefined); + expect(itemToRemove.disposed).toEqual(true); + + expect(target.get("itemToCleanup")).toEqual(undefined); + expect(itemToRemove.disposed).toEqual(true); + + expect(target.get("nonDisposableItem")).toEqual(nonDisposableItem); + expect(nonDisposableItem.disposed).toEqual(false); + + expect(target.get("nonExpiredItem")).toEqual(nonExpiredItem); + expect(nonExpiredItem.disposed).toEqual(false); + }); + it("test_clear", async () => { + const target = new SlidingExpirationCache( + BigInt(50_000_000), + (item: DisposableItem) => item.shouldDispose, + (item) => item.dispose() + ); + const item1 = new DisposableItem(false); + const item2 = new DisposableItem(false); + + target.computeIfAbsent(1, () => item1, BigInt(15_000_000_000)); + target.computeIfAbsent(2, () => item2, BigInt(15_000_000_000)); + + expect(target.size).toEqual(2); + expect(target.get(1)).toEqual(item1); + expect(target.get(2)).toEqual(item2); + + target.clear(); + + expect(target.size).toEqual(0); + expect(target.get(1)).toEqual(undefined); + expect(target.get(2)).toEqual(undefined); + expect(item1.disposed).toEqual(true); + expect(item2.disposed).toEqual(true); + }); +});