From 323544bd4eae431bd8e6d6248099355a6fd04627 Mon Sep 17 00:00:00 2001 From: joyc-bq <95259163+joyc-bq@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:03:06 -0500 Subject: [PATCH] feat: least connections strategy and fix databasePools computeIfAbsent (#282) --- common/lib/aws_pool_client.ts | 2 + .../internal_pooled_connection_provider.ts | 78 +++++----- common/lib/least_connections_host_selector.ts | 64 +++++++++ common/lib/mysql_client_wrapper.ts | 6 +- common/lib/pg_client_wrapper.ts | 6 +- common/lib/pool_client_wrapper.ts | 6 +- common/lib/utils/pool_key.ts | 6 +- common/lib/utils/sliding_expiration_cache.ts | 1 + mysql/lib/dialect/mysql2_driver_dialect.ts | 6 +- mysql/lib/mysql_pool_client.ts | 4 + pg/lib/pg_pool_client.ts | 4 + .../tests/read_write_splitting.test.ts | 133 +++++++++++++++++- .../tests/utils/test_database_info.ts | 4 + .../container/tests/utils/test_environment.ts | 6 +- .../tests/utils/test_telemetry_info.ts | 6 +- tests/plugin_manager_telemetry_benchmarks.ts | 6 +- .../internal_pool_connection_provider.test.ts | 88 ++++++++---- 17 files changed, 331 insertions(+), 95 deletions(-) create mode 100644 common/lib/least_connections_host_selector.ts diff --git a/common/lib/aws_pool_client.ts b/common/lib/aws_pool_client.ts index c6b9b9d0..8cd3c5e8 100644 --- a/common/lib/aws_pool_client.ts +++ b/common/lib/aws_pool_client.ts @@ -24,4 +24,6 @@ export interface AwsPoolClient { getIdleCount(): number; getTotalCount(): number; + + getActiveCount(): number; } diff --git a/common/lib/internal_pooled_connection_provider.ts b/common/lib/internal_pooled_connection_provider.ts index d7843b0d..e60c8132 100644 --- a/common/lib/internal_pooled_connection_provider.ts +++ b/common/lib/internal_pooled_connection_provider.ts @@ -33,40 +33,41 @@ import { Messages } from "./utils/messages"; import { HostSelector } from "./host_selector"; import { RandomHostSelector } from "./random_host_selector"; import { InternalPoolMapping } from "./utils/internal_pool_mapping"; -import { logger } from "../logutils"; import { RoundRobinHostSelector } from "./round_robin_host_selector"; import { AwsPoolClient } from "./aws_pool_client"; import { AwsPoolConfig } from "./aws_pool_config"; +import { LeastConnectionsHostSelector } from "./least_connections_host_selector"; import { PoolClientWrapper } from "./pool_client_wrapper"; +import { logger } from "../logutils"; export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources { - private static readonly acceptedStrategies: Map = new Map([ - [RandomHostSelector.STRATEGY_NAME, new RandomHostSelector()], - [RoundRobinHostSelector.STRATEGY_NAME, new RoundRobinHostSelector()] - ]); static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 60 minutes static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30_000_000_000); // 30 minutes + protected static databasePools: SlidingExpirationCache = new SlidingExpirationCache( + InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS, + (pool: any) => pool.getActiveCount() === 0, + (pool: any) => pool.end() + ); + private static readonly acceptedStrategies: Map = new Map([ + [RandomHostSelector.STRATEGY_NAME, new RandomHostSelector()], + [RoundRobinHostSelector.STRATEGY_NAME, new RoundRobinHostSelector()], + [LeastConnectionsHostSelector.STRATEGY_NAME, new LeastConnectionsHostSelector(InternalPooledConnectionProvider.databasePools)] + ]); private readonly rdsUtil: RdsUtils = new RdsUtils(); private readonly _poolMapping?: InternalPoolMapping; private readonly _poolConfig?: AwsPoolConfig; targetClient?: ClientWrapper; internalPool: AwsPoolClient | undefined; - protected databasePools: SlidingExpirationCache = new SlidingExpirationCache( - InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS, - (pool: any) => pool.getIdleCount() === pool.getTotalCount(), - (pool: any) => pool.end() - ); - - poolExpirationCheckNanos: bigint = InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; // 30 minutes + private static poolExpirationCheckNanos: bigint = InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; // 30 minutes constructor(poolConfig?: AwsPoolConfig); constructor(poolConfig?: AwsPoolConfig, mapping?: InternalPoolMapping); constructor(poolConfig?: AwsPoolConfig, mapping?: InternalPoolMapping, poolExpirationNanos?: bigint, poolCleanupNanos?: bigint) { this._poolMapping = mapping; - this.poolExpirationCheckNanos = poolExpirationNanos ?? InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; - this.databasePools.cleanupIntervalNs = poolCleanupNanos ?? InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS; + InternalPooledConnectionProvider.poolExpirationCheckNanos = poolExpirationNanos ?? InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; + InternalPooledConnectionProvider.databasePools.cleanupIntervalNs = poolCleanupNanos ?? InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS; this._poolConfig = poolConfig ?? new AwsPoolConfig({}); } @@ -107,25 +108,30 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide const dialect = pluginService.getDriverDialect(); const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig); - - this.internalPool = this.databasePools.computeIfAbsent( - new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)), + this.internalPool = InternalPooledConnectionProvider.databasePools.computeIfAbsent( + new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)).getPoolKeyString(), () => dialect.getAwsPoolClient(preparedConfig), - this.poolExpirationCheckNanos + InternalPooledConnectionProvider.poolExpirationCheckNanos ); - return await this.getPoolConnection(hostInfo, props); + return await this.getPoolConnection(connectionHostInfo, props); } async getPoolConnection(hostInfo: HostInfo, props: Map) { return new PoolClientWrapper(await this.internalPool!.connect(), hostInfo, props); } - public async releaseResources() { - for (const [key, val] of this.databasePools.entries) { - val.item.releaseResources(); + async releaseResources() { + for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) { + if (value.item) { + await value.item.releaseResources(); + } } - this.databasePools.clear(); + InternalPooledConnectionProvider.clearDatabasePools(); + } + + static clearDatabasePools() { + InternalPooledConnectionProvider.databasePools.clear(); } getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo { @@ -140,20 +146,12 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide return promisify(lookup)(host, {}); } - getHostUrlSet(): Set { - const hostUrls: Set = new Set(); - for (const [key, _val] of this.databasePools.entries) { - hostUrls.add(key.getUrl()); - } - return hostUrls; - } - getHostCount() { - return this.databasePools.size; + return InternalPooledConnectionProvider.databasePools.size; } - getKeySet(): Set { - return new Set(this.databasePools.keys); + getKeySet(): Set { + return new Set(InternalPooledConnectionProvider.databasePools.keys); } getPoolKey(hostInfo: HostInfo, props: Map) { @@ -161,15 +159,17 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide } logConnections() { - if (this.databasePools.size === 0) { + if (InternalPooledConnectionProvider.databasePools.size === 0) { return; } - const l = Array.from(this.databasePools.entries).map(([v, k]) => [v.getPoolKeyString(), k.item.constructor.name]); - logger.debug(`Internal Pooled Connection: [${JSON.stringify(l)}]`); + const connections = Array.from(InternalPooledConnectionProvider.databasePools.entries).map(([v, k]) => [v, k.item.constructor.name]); + logger.debug(`Internal Pooled Connections: [\r\n${connections.join("\r\n")}\r\n]`); } - setDatabasePools(connectionPools: SlidingExpirationCache): void { - this.databasePools = connectionPools; + // for testing only + setDatabasePools(connectionPools: SlidingExpirationCache): void { + InternalPooledConnectionProvider.databasePools = connectionPools; + LeastConnectionsHostSelector.setDatabasePools(connectionPools); } } diff --git a/common/lib/least_connections_host_selector.ts b/common/lib/least_connections_host_selector.ts new file mode 100644 index 00000000..58a6ed86 --- /dev/null +++ b/common/lib/least_connections_host_selector.ts @@ -0,0 +1,64 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { HostSelector } from "./host_selector"; +import { HostInfo } from "./host_info"; +import { HostRole } from "./host_role"; +import { AwsWrapperError } from "./utils/errors"; +import { HostAvailability } from "./host_availability/host_availability"; +import { Messages } from "./utils/messages"; +import { SlidingExpirationCache } from "./utils/sliding_expiration_cache"; + +export class LeastConnectionsHostSelector implements HostSelector { + protected static databasePools: SlidingExpirationCache; + static readonly STRATEGY_NAME = "leastConnections"; + + constructor(databasePools: SlidingExpirationCache) { + LeastConnectionsHostSelector.databasePools = databasePools; + } + + getHost(hosts: HostInfo[], role: HostRole, props?: Map): HostInfo { + const eligibleHosts: HostInfo[] = hosts + .filter((host: HostInfo) => host.role === role && host.availability === HostAvailability.AVAILABLE) + .sort((hostA: HostInfo, hostB: HostInfo) => { + const hostACount = this.getNumConnections(hostA, LeastConnectionsHostSelector.databasePools); + const hostBCount = this.getNumConnections(hostB, LeastConnectionsHostSelector.databasePools); + return hostACount < hostBCount ? -1 : hostACount > hostBCount ? 1 : 0; + }); + + if (eligibleHosts.length === 0) { + throw new AwsWrapperError(Messages.get("HostSelector.noHostsMatchingRole", role)); + } + return eligibleHosts[0]; + } + + getNumConnections(hostInfo: HostInfo, databasePools: SlidingExpirationCache): number { + let numConnections: number = 0; + const url: string = hostInfo.url; + for (const [key, val] of databasePools.map.entries()) { + if (!key.includes(url)) { + continue; + } + numConnections += val.item.getActiveCount(); + } + return numConnections; + } + + // for testing purposes only + static setDatabasePools(connectionPools: SlidingExpirationCache): void { + LeastConnectionsHostSelector.databasePools = connectionPools; + } +} diff --git a/common/lib/mysql_client_wrapper.ts b/common/lib/mysql_client_wrapper.ts index d2918e9e..032b8f1b 100644 --- a/common/lib/mysql_client_wrapper.ts +++ b/common/lib/mysql_client_wrapper.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/lib/pg_client_wrapper.ts b/common/lib/pg_client_wrapper.ts index e7820340..7e87f3a7 100644 --- a/common/lib/pg_client_wrapper.ts +++ b/common/lib/pg_client_wrapper.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/lib/pool_client_wrapper.ts b/common/lib/pool_client_wrapper.ts index 85150a61..ba499c26 100644 --- a/common/lib/pool_client_wrapper.ts +++ b/common/lib/pool_client_wrapper.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/lib/utils/pool_key.ts b/common/lib/utils/pool_key.ts index 25902a18..2d830169 100644 --- a/common/lib/utils/pool_key.ts +++ b/common/lib/utils/pool_key.ts @@ -27,7 +27,11 @@ export class PoolKey { return this.url; } + getExtraKey(): string { + return this.extraKey; + } + getPoolKeyString(): string { - return "PoolKey [url=" + this.url + ", extraKey=" + this.extraKey + "]"; + return `PoolKey [url=${this.getUrl()}, extraKey=${this.getExtraKey()}]`; } } diff --git a/common/lib/utils/sliding_expiration_cache.ts b/common/lib/utils/sliding_expiration_cache.ts index 923161b2..8511cfd5 100644 --- a/common/lib/utils/sliding_expiration_cache.ts +++ b/common/lib/utils/sliding_expiration_cache.ts @@ -61,6 +61,7 @@ export class SlidingExpirationCache { get keys() { return this.map.keys(); } + set cleanupIntervalNs(value: bigint) { this._cleanupIntervalNanos = value; } diff --git a/mysql/lib/dialect/mysql2_driver_dialect.ts b/mysql/lib/dialect/mysql2_driver_dialect.ts index 18280be9..4f1dc3eb 100644 --- a/mysql/lib/dialect/mysql2_driver_dialect.ts +++ b/mysql/lib/dialect/mysql2_driver_dialect.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/mysql/lib/mysql_pool_client.ts b/mysql/lib/mysql_pool_client.ts index 77addfa0..591fd0fd 100644 --- a/mysql/lib/mysql_pool_client.ts +++ b/mysql/lib/mysql_pool_client.ts @@ -50,6 +50,10 @@ export class AwsMysqlPoolClient implements AwsPoolClient { return this.targetPool.pool._allConnections.length; } + getActiveCount(): number { + return this.getTotalCount() - this.getIdleCount(); + } + async releaseResources(): Promise { await this.targetPool.end(); } diff --git a/pg/lib/pg_pool_client.ts b/pg/lib/pg_pool_client.ts index c17e96d0..4d8055ef 100644 --- a/pg/lib/pg_pool_client.ts +++ b/pg/lib/pg_pool_client.ts @@ -57,6 +57,10 @@ export class AwsPgPoolClient implements AwsPoolClient { return this.targetPool.totalCount; } + getActiveCount(): number { + return this.getTotalCount() - this.getIdleCount(); + } + async releaseResources(): Promise { await this.targetPool.end(); } diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts index 0927ba4c..d313d9d0 100644 --- a/tests/integration/container/tests/read_write_splitting.test.ts +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -27,10 +27,13 @@ import { features, instanceCount } from "./config"; import { InternalPooledConnectionProvider } from "../../../../common/lib/internal_pooled_connection_provider"; import { AwsPoolConfig } from "../../../../common/lib/aws_pool_config"; import { ConnectionProviderManager } from "../../../../common/lib/connection_provider_manager"; +import { InternalPoolMapping } from "../../../../common/lib/utils/internal_pool_mapping"; +import { HostInfo } from "../../../../common/lib/host_info"; const itIf = !features.includes(TestEnvironmentFeatures.PERFORMANCE) && features.includes(TestEnvironmentFeatures.IAM) && instanceCount >= 2 ? it : it.skip; const itIfMinThreeInstance = instanceCount >= 3 ? itIf : it.skip; +const itIfMinFiveInstance = instanceCount >= 5 ? itIf : it.skip; let env: TestEnvironment; let driver; @@ -107,6 +110,7 @@ describe("aurora read write splitting", () => { if (provider !== null) { try { await ConnectionProviderManager.releaseResources(); + logger.debug("Successfully released all pooled connections"); ConnectionProviderManager.resetProvider(); } catch (error) { // pass @@ -486,10 +490,10 @@ describe("aurora read write splitting", () => { "test pooled connection failover", async () => { const config = await initConfigWithFailover(env.databaseInfo.writerInstanceEndpoint, env.databaseInfo.instanceEndpointPort, false); - client = initClientFunc(config); - const provider = new InternalPooledConnectionProvider(); + provider = new InternalPooledConnectionProvider(); ConnectionProviderManager.setConnectionProvider(provider); + client = initClientFunc(config); client.on("error", (error: any) => { logger.debug(`event emitter threw error: ${error.message}`); logger.debug(error.stack); @@ -506,12 +510,24 @@ describe("aurora read write splitting", () => { const newWriterId = await auroraTestUtility.queryInstanceId(client); expect(newWriterId).not.toBe(initialWriterId); - await client.connect(); + + secondaryClient = initClientFunc(config); + secondaryClient.on("error", (error: any) => { + logger.debug(`event emitter threw error: ${error.message}`); + logger.debug(error.stack); + }); + + await secondaryClient.connect(); provider.logConnections(); - const oldWriterId = await auroraTestUtility.queryInstanceId(client); + const oldWriterId = await auroraTestUtility.queryInstanceId(secondaryClient); // This should be a new connection to the initial writer instance (now a reader). expect(oldWriterId).toBe(initialWriterId); provider.logConnections(); + try { + await secondaryClient.end(); + } catch (error) { + // pass + } }, 1000000 ); @@ -523,7 +539,13 @@ describe("aurora read write splitting", () => { client = initClientFunc(config); secondaryClient = initClientFunc(config); - const provider = new InternalPooledConnectionProvider(new AwsPoolConfig({ minConnections: 0, maxConnections: 10, maxIdleConnections: 10 })); + provider = new InternalPooledConnectionProvider( + new AwsPoolConfig({ + minConnections: 0, + maxConnections: 10, + maxIdleConnections: 10 + }) + ); ConnectionProviderManager.setConnectionProvider(provider); @@ -544,6 +566,8 @@ describe("aurora read write splitting", () => { provider.logConnections(); try { await secondaryClient.end(); + await ConnectionProviderManager.releaseResources(); + ConnectionProviderManager.resetProvider(); } catch (error) { // pass } @@ -596,8 +620,6 @@ describe("aurora read write splitting", () => { client = initClientFunc(config); provider = new InternalPooledConnectionProvider(); - await ConnectionProviderManager.releaseResources(); // make sure there's no pool's left after prior test - ConnectionProviderManager.setConnectionProvider(provider); client.on("error", (error: any) => { @@ -629,4 +651,101 @@ describe("aurora read write splitting", () => { }, 1000000 ); + + itIfMinFiveInstance( + "test pooled connection least connections strategy", + async () => { + const numInstances = env.databaseInfo.instances.length; + const connectedReaderIds: Set = new Set(); + const connectionsSet: Set = new Set(); + try { + provider = new InternalPooledConnectionProvider({ maxConnections: numInstances }); + ConnectionProviderManager.setConnectionProvider(provider); + const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint, env.databaseInfo.instanceEndpointPort, false); + config["readerHostSelectorStrategy"] = "leastConnections"; + + // Assume one writer and [size - 1] readers + for (let i = 0; i < numInstances - 1; i++) { + const client = initClientFunc(config); + client.on("error", (error: any) => { + logger.debug(`event emitter threw error: ${error.message}`); + logger.debug(error.stack); + }); + + await client.connect(); + await client.setReadOnly(true); + const readerId = await auroraTestUtility.queryInstanceId(client); + expect(connectedReaderIds).not.toContain(readerId); + connectedReaderIds.add(readerId); + connectionsSet.add(client); + } + } finally { + for (const connection of connectionsSet) { + await connection.end(); + } + } + }, + 1000000 + ); + + itIfMinFiveInstance( + "test pooled connection least connections with pooled mapping", + async () => { + const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint, env.databaseInfo.instanceEndpointPort, false); + config["readerHostSelectorStrategy"] = "leastConnections"; + + const myKeyFunc: InternalPoolMapping = { + getKey: (hostInfo: HostInfo, props: Map) => { + return hostInfo.url + props.get("arbitraryProp"); + } + }; + + // We will be testing all instances excluding the writer and overloaded reader. Each instance + // should be tested numOverloadedReaderConnections times to increase the pool connection count + // until it equals the connection count of the overloaded reader. + const numOverloadedReaderConnections = 3; + const numInstances = env.databaseInfo.instances.length; + const numTestConnections = (numInstances - 2) * numOverloadedReaderConnections; + provider = new InternalPooledConnectionProvider({ maxConnections: numTestConnections }, myKeyFunc); + ConnectionProviderManager.setConnectionProvider(provider); + + let overloadedReaderId; + const connectionsSet: Set = new Set(); + try { + for (let i = 0; i < numOverloadedReaderConnections; i++) { + const readerConfig = await initDefaultConfig(env.databaseInfo.readerInstanceEndpoint, env.databaseInfo.instanceEndpointPort, false); + readerConfig["arbitraryProp"] = "value" + i.toString(); + readerConfig["readerHostSelectorStrategy"] = "leastConnections"; + const client = initClientFunc(readerConfig); + client.on("error", (error: any) => { + logger.debug(`event emitter threw error: ${error.message}`); + logger.debug(error.stack); + }); + await client.connect(); + connectionsSet.add(client); + if (i === 0) { + overloadedReaderId = await auroraTestUtility.queryInstanceId(client); + } + } + + for (let i = 0; i < numTestConnections - 1; i++) { + const client = initClientFunc(config); + client.on("error", (error: any) => { + logger.debug(`event emitter threw error: ${error.message}`); + logger.debug(error.stack); + }); + await client.connect(); + await client.setReadOnly(true); + const readerId = await auroraTestUtility.queryInstanceId(client); + expect(readerId).not.toBe(overloadedReaderId); + connectionsSet.add(client); + } + } finally { + for (const connection of connectionsSet) { + await connection.end(); + } + } + }, + 1000000 + ); }); diff --git a/tests/integration/container/tests/utils/test_database_info.ts b/tests/integration/container/tests/utils/test_database_info.ts index f5dbedd4..19bc25ee 100644 --- a/tests/integration/container/tests/utils/test_database_info.ts +++ b/tests/integration/container/tests/utils/test_database_info.ts @@ -61,6 +61,10 @@ export class TestDatabaseInfo { return this._instances[0].host ?? ""; } + get readerInstanceEndpoint() { + return this._instances[1].host ?? ""; + } + get writerInstanceId() { return this._instances[0].instanceId; } diff --git a/tests/integration/container/tests/utils/test_environment.ts b/tests/integration/container/tests/utils/test_environment.ts index f60d1252..f28734bc 100644 --- a/tests/integration/container/tests/utils/test_environment.ts +++ b/tests/integration/container/tests/utils/test_environment.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/tests/integration/container/tests/utils/test_telemetry_info.ts b/tests/integration/container/tests/utils/test_telemetry_info.ts index 46de09f8..b8e49c42 100644 --- a/tests/integration/container/tests/utils/test_telemetry_info.ts +++ b/tests/integration/container/tests/utils/test_telemetry_info.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/tests/plugin_manager_telemetry_benchmarks.ts b/tests/plugin_manager_telemetry_benchmarks.ts index 9bd7a8b5..e0187f6c 100644 --- a/tests/plugin_manager_telemetry_benchmarks.ts +++ b/tests/plugin_manager_telemetry_benchmarks.ts @@ -1,12 +1,12 @@ /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - + Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/tests/unit/internal_pool_connection_provider.test.ts b/tests/unit/internal_pool_connection_provider.test.ts index 5e5d6f0c..101bddef 100644 --- a/tests/unit/internal_pool_connection_provider.test.ts +++ b/tests/unit/internal_pool_connection_provider.test.ts @@ -19,24 +19,27 @@ import { HostInfo } from "../../common/lib/host_info"; import { HostInfoBuilder } from "../../common/lib/host_info_builder"; import { HostRole } from "../../common/lib/host_role"; import { PluginService } from "../../common/lib/plugin_service"; -import { AwsMySQLClient } from "../../mysql/lib"; import { anything, instance, mock, reset, spy, when } from "ts-mockito"; import { HostListProviderService } from "../../common/lib/host_list_provider_service"; import { SimpleHostAvailabilityStrategy } from "../../common/lib/host_availability/simple_host_availability_strategy"; -import { MySQLDatabaseDialect } from "../../mysql/lib/dialect/mysql_database_dialect"; import { HostListProvider } from "../../common/lib/host_list_provider/host_list_provider"; import { WrapperProperties } from "../../common/lib/wrapper_property"; import { InternalPooledConnectionProvider } from "../../common/lib/internal_pooled_connection_provider"; import { AwsPoolConfig } from "../../common/lib/aws_pool_config"; import { ConnectionProviderManager } from "../../common/lib/connection_provider_manager"; import { RdsUtils } from "../../common/lib/utils/rds_utils"; -import { AwsMysqlPoolClient } from "../../mysql/lib/mysql_pool_client"; import { PoolKey } from "../../common/lib/utils/pool_key"; import { InternalPoolMapping } from "../../common/lib/utils/internal_pool_mapping"; import { SlidingExpirationCache } from "../../common/lib/utils/sliding_expiration_cache"; +import { AwsMysqlPoolClient } from "../../mysql/lib/mysql_pool_client"; +import { AwsMySQLClient } from "../../mysql/lib"; +import { MySQLDatabaseDialect } from "../../mysql/lib/dialect/mysql_database_dialect"; import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect"; import { PoolClientWrapper } from "../../common/lib/pool_client_wrapper"; +const user1 = "user1"; +const user2 = "user2"; +const internalPoolWithOneConnection = mock(AwsMysqlPoolClient); const props: Map = new Map(); const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() }); const writerHost = builder.withHost("writer-host").withRole(HostRole.WRITER).build(); @@ -49,13 +52,32 @@ const readerHost1Connection = builder.withHost(readerUrl1Connection).withPort(54 const readerHost2Connection = builder.withHost(readerUrl2Connection).withPort(5432).withRole(HostRole.READER).build(); const writerHostNoConnection = builder.withHost(writerUrlNoConnections).withPort(5432).withRole(HostRole.WRITER).build(); const testHostsList = [writerHostNoConnection, readerHost1Connection, readerHost2Connection]; -const target: SlidingExpirationCache = new SlidingExpirationCache(BigInt(10000)); +function getTestPoolMap() { + const target: SlidingExpirationCache = new SlidingExpirationCache(BigInt(10000000)); + target.computeIfAbsent( + new PoolKey(readerHost1Connection.url, user1).getPoolKeyString(), + () => instance(internalPoolWithOneConnection), + BigInt(10000000) + ); + target.computeIfAbsent( + new PoolKey(readerHost2Connection.url, user2).getPoolKeyString(), + () => instance(internalPoolWithOneConnection), + BigInt(10000000) + ); + + target.computeIfAbsent( + new PoolKey(readerHost2Connection.url, user1).getPoolKeyString(), + () => instance(internalPoolWithOneConnection), + BigInt(10000000) + ); + return target; +} const defaultHosts = [writerHost, readerHost1, readerHost2]; const mockPluginService: PluginService = mock(PluginService); const mockReaderClient: AwsClient = mock(AwsMySQLClient); const mockWriterClient: AwsClient = mock(AwsMySQLClient); -const mockMySQLClient: AwsClient = mock(AwsMySQLClient); +const mockAwsMySQLClient: AwsClient = mock(AwsMySQLClient); const mockHostInfo: HostInfo = mock(HostInfo); const mockHostListProviderService: HostListProviderService = mock(); const mockHostListProvider: HostListProvider = mock(); @@ -69,7 +91,7 @@ const mockRdsUtils = mock(RdsUtils); const mockPoolConfig = mock(AwsPoolConfig); const mockDialectInstance = instance(mockDialect); -describe("reader write splitting test", () => { +describe("internal pool connection provider test", () => { beforeEach(() => { when(mockPluginService.getHostListProvider()).thenReturn(instance(mockHostListProvider)); when(mockPluginService.getHosts()).thenReturn(defaultHosts); @@ -82,16 +104,24 @@ describe("reader write splitting test", () => { afterEach(() => { reset(mockReaderClient); - reset(mockMySQLClient); + reset(mockAwsMySQLClient); reset(mockHostInfo); reset(mockPluginService); reset(mockHostListProviderService); - reset(mockReaderClient); reset(mockWriterClient); reset(mockClosedReaderClient); reset(mockClosedWriterClient); + reset(mockPoolConfig); + reset(mockAwsPoolClient); + reset(mockDriverDialect); + reset(mockClosedReaderClient); + reset(mockClosedWriterClient); + reset(mockRdsUtils); + reset(mockHostListProvider); + + InternalPooledConnectionProvider.clearDatabasePools(); + ConnectionProviderManager.resetProvider(); }); - const mockRdsUtilsInstance = instance(mockRdsUtils); it("test connect with default mapping", async () => { const mockPluginServiceInstance = instance(mockPluginService); @@ -102,6 +132,7 @@ describe("reader write splitting test", () => { props.set(WrapperProperties.USER.name, "mySqlUser"); props.set(WrapperProperties.PASSWORD.name, "mySqlPassword"); + when(mockRdsUtils.isRdsDns(anything())).thenReturn(false); when(mockRdsUtils.isGreenInstance(anything())).thenReturn(null); when(mockRdsUtils.isRdsInstance("instance1")).thenReturn(true); const config = { @@ -116,16 +147,13 @@ describe("reader write splitting test", () => { when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await providerSpy.connect(hostInfo, mockPluginServiceInstance, props); - const expectedKeys: Set = new Set(); - expectedKeys.add(new PoolKey("instance1/", "mySqlUser")); + const expectedKeys: Set = new Set(); + expectedKeys.add(new PoolKey("instance1/", "mySqlUser").getPoolKeyString()); const expectedHosts = new Set(); expectedHosts.add("instance1/"); expect(providerSpy.getHostCount()).toBe(1); expect(providerSpy.getKeySet()).toEqual(expectedKeys); - expect(providerSpy.getHostUrlSet()).toEqual(expectedHosts); - await ConnectionProviderManager.releaseResources(); - ConnectionProviderManager.resetProvider(); }); it("test connect with custom mapping", async () => { @@ -134,9 +162,10 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(hostInfo); when(mockRdsUtils.isRdsInstance("instance1")).thenReturn(true); - props.set(WrapperProperties.USER.name, "mysqlUser"); - props.set(WrapperProperties.PASSWORD.name, "mysqlPassword"); + props.set(WrapperProperties.USER.name, "mySqlUser"); + props.set(WrapperProperties.PASSWORD.name, "mySqlPassword"); + when(mockRdsUtils.isRdsDns(anything())).thenReturn(false); when(mockRdsUtils.isGreenInstance(anything())).thenReturn(null); when(mockRdsUtils.isRdsInstance("instance1")).thenReturn(true); when(mockPluginService.getDialect()).thenReturn(mockDialect); @@ -157,26 +186,32 @@ describe("reader write splitting test", () => { when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await providerSpy.connect(hostInfo, mockPluginServiceInstance, props); - const expectedKeys: Set = new Set(); - expectedKeys.add(new PoolKey("instance1/", "instance1/someKey")); + const expectedKeys: Set = new Set(); + expectedKeys.add(new PoolKey("instance1/", "instance1/someKey").getPoolKeyString()); const expectedHosts = new Set(); expectedHosts.add("instance1/"); - expect(providerSpy.getHostCount()).toBe(1); expect(providerSpy.getKeySet()).toEqual(expectedKeys); - expect(providerSpy.getHostUrlSet()).toEqual(expectedHosts); - await ConnectionProviderManager.releaseResources(); - ConnectionProviderManager.resetProvider(); + expect(providerSpy.getHostCount()).toBe(1); }); it("test random strategy", async () => { const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig)); const providerSpy = instance(provider); - providerSpy.setDatabasePools(target); + providerSpy.setDatabasePools(getTestPoolMap()); const selectedHost = providerSpy.getHostInfoByStrategy(testHostsList, HostRole.READER, "random", props); expect(selectedHost.host === readerUrl1Connection || selectedHost.host === readerUrl2Connection).toBeTruthy(); - await ConnectionProviderManager.releaseResources(); - ConnectionProviderManager.resetProvider(); + }); + + it("test least connection strategy", async () => { + const provider = spy(new InternalPooledConnectionProvider(mockPoolConfig)); + const providerSpy = instance(provider); + providerSpy.setDatabasePools(getTestPoolMap()); + when(internalPoolWithOneConnection.getActiveCount()).thenReturn(1); + + const selectedHost = providerSpy.getHostInfoByStrategy(testHostsList, HostRole.READER, "leastConnections", props); + // other reader has 2 connections + expect(selectedHost.host).toEqual(readerUrl1Connection); }); it("test connect to deleted instance", async () => { @@ -186,6 +221,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(hostInfo); when(mockPluginService.getDialect()).thenReturn(mockDialectInstance); + when(mockRdsUtils.isRdsDns(anything())).thenReturn(false); when(mockRdsUtils.isGreenInstance(anything())).thenReturn(null); when(mockRdsUtils.isRdsInstance("instance1")).thenReturn(true); when(mockDriverDialect.preparePoolClientProperties(anything(), anything())).thenReturn(props); @@ -196,7 +232,5 @@ describe("reader write splitting test", () => { when(await provider.getPoolConnection(anything(), anything())).thenReturn(mockPoolClientWrapper); await expect(providerSpy.connect(hostInfo, mockPluginServiceInstance, props)).rejects.toThrow("testError"); - await ConnectionProviderManager.releaseResources(); - ConnectionProviderManager.resetProvider(); }); });