Skip to content

Commit

Permalink
chore: remove getConnectionProvider from plugin service (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq authored Nov 5, 2024
1 parent 0b05bc7 commit 7177a5c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
7 changes: 0 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return this.getCurrentClient().connectionUrlParser;
}

getConnectionProvider(hostInfo: HostInfo | null, props: Map<string, any>): ConnectionProvider {
if (!this.pluginServiceManagerContainer.pluginManager) {
throw new AwsWrapperError("Plugin manager should not be undefined");
}
return this.pluginServiceManagerContainer.pluginManager.getConnectionProvider(hostInfo, props);
}

getDialect(): DatabaseDialect {
return this.dialect;
}
Expand Down
5 changes: 3 additions & 2 deletions common/lib/plugins/read_write_splitting_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { ClientWrapper } from "../client_wrapper";
import { getWriter, logAndThrowError } from "../utils/utils";
import { CanReleaseResources } from "../can_release_resources";
import { InternalPooledConnectionProvider } from "../internal_pooled_connection_provider";
import { PoolClientWrapper } from "../pool_client_wrapper";

export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
private static readonly subscribedMethods: Set<string> = new Set(["initHostProvider", "connect", "notifyConnectionChanged", "query"]);
Expand Down Expand Up @@ -197,7 +198,7 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement
props.set(WrapperProperties.HOST.name, writerHost.host);
try {
const targetClient = await this.pluginService.connect(writerHost, props);
this.isWriterClientFromInternalPool = this.pluginService.getConnectionProvider(writerHost, props) instanceof InternalPooledConnectionProvider;
this.isWriterClientFromInternalPool = targetClient instanceof PoolClientWrapper;
this.setWriterClient(targetClient, writerHost);
await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost);
} catch (any) {
Expand Down Expand Up @@ -290,7 +291,7 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement

try {
targetClient = await this.pluginService.connect(host, props);
this.isReaderClientFromInternalPool = this.pluginService.getConnectionProvider(host, props) instanceof InternalPooledConnectionProvider;
this.isReaderClientFromInternalPool = targetClient instanceof PoolClientWrapper;
readerHost = host;
break;
} catch (any) {
Expand Down
49 changes: 26 additions & 23 deletions tests/unit/read_write_splitting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import { InternalPooledConnectionProvider } from "../../common/lib/internal_pool
import { AwsPoolConfig } from "../../common/lib/aws_pool_config";
import { ConnectionProviderManager } from "../../common/lib/connection_provider_manager";
import { InternalPoolMapping } from "../../common/lib/utils/internal_pool_mapping";
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
import { DriverDialect } from "../../common/lib/driver_dialect/driver_dialect";
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
import { PoolClientWrapper } from "../../common/lib/pool_client_wrapper";

const properties: Map<string, any> = new Map();
const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });
Expand All @@ -63,14 +65,9 @@ const mockDialect: MySQLDatabaseDialect = mock(MySQLDatabaseDialect);
const mockDriverDialect: DriverDialect = mock(MySQL2DriverDialect);
const mockChanges: Set<HostChangeOptions> = mock(Set<HostChangeOptions>);

const clientWrapper: ClientWrapper = {
client: undefined,
hostInfo: mockHostInfo,
properties: new Map<string, any>()
};

const mockReaderWrapper: ClientWrapper = mock(clientWrapper);
const mockWriterWrapper: ClientWrapper = mock(clientWrapper);
const mockReaderWrapper: ClientWrapper = mock(PgClientWrapper);
const mockWriterWrapper: ClientWrapper = mock(MySQLClientWrapper);
const poolClientWrapper: ClientWrapper = new PoolClientWrapper(undefined, writerHost, new Map<string, string>());

const clientWrapper_undefined: any = undefined;

Expand All @@ -89,7 +86,7 @@ describe("reader write splitting test", () => {
when(mockPluginService.isInTransaction()).thenReturn(false);
when(mockPluginService.getDialect()).thenReturn(mockDialect);
when(mockPluginService.getDriverDialect()).thenReturn(mockDriverDialect);
when(mockDriverDialect.connect(anything())).thenReturn(Promise.resolve(mockReaderClient));
when(mockDriverDialect.connect(anything(), anything())).thenReturn(Promise.resolve(mockReaderWrapper));
properties.clear();
});

Expand Down Expand Up @@ -411,11 +408,14 @@ describe("reader write splitting test", () => {
when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient));
when(await mockWriterClient.isValid()).thenReturn(true);
when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost).thenReturn(writerHost).thenReturn(readerHost1);
when(mockDriverDialect.connect(anything())).thenReturn(Promise.resolve(mockReaderClient));
when(mockPluginService.connect(anything(), anything())).thenResolve(mockReaderWrapper);
const config: AwsPoolConfig = new AwsPoolConfig({ idleTimeoutMillis: 7000, maxConnections: 10, maxIdleConnections: 10 });
when(mockDriverDialect.connect(anything(), anything())).thenReturn(Promise.resolve(poolClientWrapper));
when(mockPluginService.connect(anything(), anything())).thenResolve(poolClientWrapper);
const config: AwsPoolConfig = new AwsPoolConfig({
idleTimeoutMillis: 7000,
maxConnections: 10,
maxIdleConnections: 10
});
const provider: InternalPooledConnectionProvider = new InternalPooledConnectionProvider(config);
when(mockPluginService.getConnectionProvider(anything(), anything())).thenReturn(provider);

ConnectionProviderManager.setConnectionProvider(provider);

Expand All @@ -426,7 +426,7 @@ describe("reader write splitting test", () => {
const spyTarget = instance(target);
await spyTarget.switchClientIfRequired(true);
await spyTarget.switchClientIfRequired(false);
verify(target.closeTargetClientIfIdle(mockReaderWrapper)).once();
verify(target.closeTargetClientIfIdle(poolClientWrapper)).once();
});

it("test pooled writer connection after set read only", async () => {
Expand All @@ -441,19 +441,22 @@ describe("reader write splitting test", () => {
.thenReturn(readerHost1)
.thenReturn(readerHost1)
.thenReturn(writerHost);
when(mockDriverDialect.connect(anything())).thenReturn(Promise.resolve(mockWriterClient));
when(mockPluginService.connect(writerHost, anything())).thenResolve(mockWriterWrapper);
when(mockPluginService.connect(readerHost1, anything())).thenResolve(mockReaderWrapper);
when(mockPluginService.connect(readerHost2, anything())).thenResolve(mockReaderWrapper);

const config: AwsPoolConfig = new AwsPoolConfig({ idleTimeoutMillis: 7000, maxConnections: 10, maxIdleConnections: 10 });
when(mockDriverDialect.connect(anything(), anything())).thenReturn(Promise.resolve(poolClientWrapper));
when(mockPluginService.connect(writerHost, anything())).thenResolve(poolClientWrapper);
when(mockPluginService.connect(readerHost1, anything())).thenResolve(poolClientWrapper);
when(mockPluginService.connect(readerHost2, anything())).thenResolve(poolClientWrapper);

const config: AwsPoolConfig = new AwsPoolConfig({
idleTimeoutMillis: 7000,
maxConnections: 10,
maxIdleConnections: 10
});
const myKeyFunc: InternalPoolMapping = {
getKey: (hostInfo: HostInfo, props: Map<string, any>) => {
return hostInfo.url + "someKey";
}
};
const provider: InternalPooledConnectionProvider = new InternalPooledConnectionProvider(config, myKeyFunc);
when(mockPluginService.getConnectionProvider(anything(), anything())).thenReturn(provider);

ConnectionProviderManager.setConnectionProvider(provider);

Expand All @@ -472,6 +475,6 @@ describe("reader write splitting test", () => {
await spyTarget.switchClientIfRequired(false);
await spyTarget.switchClientIfRequired(true);

verify(target.closeTargetClientIfIdle(mockWriterWrapper)).once();
verify(target.closeTargetClientIfIdle(poolClientWrapper)).twice();
});
});

0 comments on commit 7177a5c

Please sign in to comment.