Skip to content

Commit

Permalink
feat: least connections strategy and fix databasePools computeIfAbsent (
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq authored Nov 6, 2024
1 parent e7c31a3 commit 323544b
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 95 deletions.
2 changes: 2 additions & 0 deletions common/lib/aws_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ export interface AwsPoolClient {
getIdleCount(): number;

getTotalCount(): number;

getActiveCount(): number;
}
78 changes: 39 additions & 39 deletions common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,41 @@ import { Messages } from "./utils/messages";
import { HostSelector } from "./host_selector";
import { RandomHostSelector } from "./random_host_selector";
import { InternalPoolMapping } from "./utils/internal_pool_mapping";
import { logger } from "../logutils";
import { RoundRobinHostSelector } from "./round_robin_host_selector";
import { AwsPoolClient } from "./aws_pool_client";
import { AwsPoolConfig } from "./aws_pool_config";
import { LeastConnectionsHostSelector } from "./least_connections_host_selector";
import { PoolClientWrapper } from "./pool_client_wrapper";
import { logger } from "../logutils";

export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
[RandomHostSelector.STRATEGY_NAME, new RandomHostSelector()],
[RoundRobinHostSelector.STRATEGY_NAME, new RoundRobinHostSelector()]
]);
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 60 minutes
static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30_000_000_000); // 30 minutes
protected static databasePools: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
(pool: any) => pool.getActiveCount() === 0,
(pool: any) => pool.end()
);

private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
[RandomHostSelector.STRATEGY_NAME, new RandomHostSelector()],
[RoundRobinHostSelector.STRATEGY_NAME, new RoundRobinHostSelector()],
[LeastConnectionsHostSelector.STRATEGY_NAME, new LeastConnectionsHostSelector(InternalPooledConnectionProvider.databasePools)]
]);
private readonly rdsUtil: RdsUtils = new RdsUtils();
private readonly _poolMapping?: InternalPoolMapping;
private readonly _poolConfig?: AwsPoolConfig;
targetClient?: ClientWrapper;
internalPool: AwsPoolClient | undefined;

protected databasePools: SlidingExpirationCache<PoolKey, any> = new SlidingExpirationCache(
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
(pool: any) => pool.getIdleCount() === pool.getTotalCount(),
(pool: any) => pool.end()
);

poolExpirationCheckNanos: bigint = InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; // 30 minutes
private static poolExpirationCheckNanos: bigint = InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; // 30 minutes

constructor(poolConfig?: AwsPoolConfig);
constructor(poolConfig?: AwsPoolConfig, mapping?: InternalPoolMapping);
constructor(poolConfig?: AwsPoolConfig, mapping?: InternalPoolMapping, poolExpirationNanos?: bigint, poolCleanupNanos?: bigint) {
this._poolMapping = mapping;
this.poolExpirationCheckNanos = poolExpirationNanos ?? InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS;
this.databasePools.cleanupIntervalNs = poolCleanupNanos ?? InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS;
InternalPooledConnectionProvider.poolExpirationCheckNanos = poolExpirationNanos ?? InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS;
InternalPooledConnectionProvider.databasePools.cleanupIntervalNs = poolCleanupNanos ?? InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS;
this._poolConfig = poolConfig ?? new AwsPoolConfig({});
}

Expand Down Expand Up @@ -107,25 +108,30 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide

const dialect = pluginService.getDriverDialect();
const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig);

this.internalPool = this.databasePools.computeIfAbsent(
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)),
this.internalPool = InternalPooledConnectionProvider.databasePools.computeIfAbsent(
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)).getPoolKeyString(),
() => dialect.getAwsPoolClient(preparedConfig),
this.poolExpirationCheckNanos
InternalPooledConnectionProvider.poolExpirationCheckNanos
);

return await this.getPoolConnection(hostInfo, props);
return await this.getPoolConnection(connectionHostInfo, props);
}

async getPoolConnection(hostInfo: HostInfo, props: Map<string, string>) {
return new PoolClientWrapper(await this.internalPool!.connect(), hostInfo, props);
}

public async releaseResources() {
for (const [key, val] of this.databasePools.entries) {
val.item.releaseResources();
async releaseResources() {
for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) {
if (value.item) {
await value.item.releaseResources();
}
}
this.databasePools.clear();
InternalPooledConnectionProvider.clearDatabasePools();
}

static clearDatabasePools() {
InternalPooledConnectionProvider.databasePools.clear();
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
Expand All @@ -140,36 +146,30 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
return promisify(lookup)(host, {});
}

getHostUrlSet(): Set<string> {
const hostUrls: Set<string> = new Set<string>();
for (const [key, _val] of this.databasePools.entries) {
hostUrls.add(key.getUrl());
}
return hostUrls;
}

getHostCount() {
return this.databasePools.size;
return InternalPooledConnectionProvider.databasePools.size;
}

getKeySet(): Set<PoolKey> {
return new Set<PoolKey>(this.databasePools.keys);
getKeySet(): Set<string> {
return new Set<string>(InternalPooledConnectionProvider.databasePools.keys);
}

getPoolKey(hostInfo: HostInfo, props: Map<string, any>) {
return this._poolMapping?.getKey(hostInfo, props) ?? WrapperProperties.USER.get(props);
}

logConnections() {
if (this.databasePools.size === 0) {
if (InternalPooledConnectionProvider.databasePools.size === 0) {
return;
}

const l = Array.from(this.databasePools.entries).map(([v, k]) => [v.getPoolKeyString(), k.item.constructor.name]);
logger.debug(`Internal Pooled Connection: [${JSON.stringify(l)}]`);
const connections = Array.from(InternalPooledConnectionProvider.databasePools.entries).map(([v, k]) => [v, k.item.constructor.name]);
logger.debug(`Internal Pooled Connections: [\r\n${connections.join("\r\n")}\r\n]`);
}

setDatabasePools(connectionPools: SlidingExpirationCache<PoolKey, any>): void {
this.databasePools = connectionPools;
// for testing only
setDatabasePools(connectionPools: SlidingExpirationCache<string, any>): void {
InternalPooledConnectionProvider.databasePools = connectionPools;
LeastConnectionsHostSelector.setDatabasePools(connectionPools);
}
}
64 changes: 64 additions & 0 deletions common/lib/least_connections_host_selector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { HostSelector } from "./host_selector";
import { HostInfo } from "./host_info";
import { HostRole } from "./host_role";
import { AwsWrapperError } from "./utils/errors";
import { HostAvailability } from "./host_availability/host_availability";
import { Messages } from "./utils/messages";
import { SlidingExpirationCache } from "./utils/sliding_expiration_cache";

export class LeastConnectionsHostSelector implements HostSelector {
protected static databasePools: SlidingExpirationCache<string, any>;
static readonly STRATEGY_NAME = "leastConnections";

constructor(databasePools: SlidingExpirationCache<string, any>) {
LeastConnectionsHostSelector.databasePools = databasePools;
}

getHost(hosts: HostInfo[], role: HostRole, props?: Map<string, any>): HostInfo {
const eligibleHosts: HostInfo[] = hosts
.filter((host: HostInfo) => host.role === role && host.availability === HostAvailability.AVAILABLE)
.sort((hostA: HostInfo, hostB: HostInfo) => {
const hostACount = this.getNumConnections(hostA, LeastConnectionsHostSelector.databasePools);
const hostBCount = this.getNumConnections(hostB, LeastConnectionsHostSelector.databasePools);
return hostACount < hostBCount ? -1 : hostACount > hostBCount ? 1 : 0;
});

if (eligibleHosts.length === 0) {
throw new AwsWrapperError(Messages.get("HostSelector.noHostsMatchingRole", role));
}
return eligibleHosts[0];
}

getNumConnections(hostInfo: HostInfo, databasePools: SlidingExpirationCache<string, any>): number {
let numConnections: number = 0;
const url: string = hostInfo.url;
for (const [key, val] of databasePools.map.entries()) {
if (!key.includes(url)) {
continue;
}
numConnections += val.item.getActiveCount();
}
return numConnections;
}

// for testing purposes only
static setDatabasePools(connectionPools: SlidingExpirationCache<string, any>): void {
LeastConnectionsHostSelector.databasePools = connectionPools;
}
}
6 changes: 3 additions & 3 deletions common/lib/mysql_client_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
6 changes: 3 additions & 3 deletions common/lib/pg_client_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
6 changes: 3 additions & 3 deletions common/lib/pool_client_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
6 changes: 5 additions & 1 deletion common/lib/utils/pool_key.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export class PoolKey {
return this.url;
}

getExtraKey(): string {
return this.extraKey;
}

getPoolKeyString(): string {
return "PoolKey [url=" + this.url + ", extraKey=" + this.extraKey + "]";
return `PoolKey [url=${this.getUrl()}, extraKey=${this.getExtraKey()}]`;
}
}
1 change: 1 addition & 0 deletions common/lib/utils/sliding_expiration_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class SlidingExpirationCache<K, V> {
get keys() {
return this.map.keys();
}

set cleanupIntervalNs(value: bigint) {
this._cleanupIntervalNanos = value;
}
Expand Down
6 changes: 3 additions & 3 deletions mysql/lib/dialect/mysql2_driver_dialect.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
4 changes: 4 additions & 0 deletions mysql/lib/mysql_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ export class AwsMysqlPoolClient implements AwsPoolClient {
return this.targetPool.pool._allConnections.length;
}

getActiveCount(): number {
return this.getTotalCount() - this.getIdleCount();
}

async releaseResources(): Promise<void> {
await this.targetPool.end();
}
Expand Down
4 changes: 4 additions & 0 deletions pg/lib/pg_pool_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export class AwsPgPoolClient implements AwsPoolClient {
return this.targetPool.totalCount;
}

getActiveCount(): number {
return this.getTotalCount() - this.getIdleCount();
}

async releaseResources(): Promise<void> {
await this.targetPool.end();
}
Expand Down
Loading

0 comments on commit 323544b

Please sign in to comment.