Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: connection tracker update after failover to new connection #356

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class ConnectionStringHostListProvider implements StaticHostListProvider
return Promise.resolve(this.hostList);
}

getHostRole(client: AwsClient): Promise<HostRole> {
getHostRole(client: ClientWrapper): Promise<HostRole> {
throw new AwsWrapperError("ConnectionStringHostListProvider does not support getHostRole.");
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export interface HostListProvider {

forceRefresh(client: ClientWrapper): Promise<HostInfo[]>;

getHostRole(client: AwsClient, dialect: DatabaseDialect): Promise<HostRole>;
getHostRole(client: ClientWrapper, dialect: DatabaseDialect): Promise<HostRole>;

identifyConnection(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo | null>;

Expand Down
8 changes: 4 additions & 4 deletions common/lib/host_list_provider/rds_host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ export class RdsHostListProvider implements DynamicHostListProvider {
throw new AwsWrapperError("Could not retrieve targetClient.");
}

async getHostRole(client: AwsClient, dialect: DatabaseDialect): Promise<HostRole> {
async getHostRole(client: ClientWrapper, dialect: DatabaseDialect): Promise<HostRole> {
if (!this.isTopologyAwareDatabaseDialect(dialect)) {
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
}

if (client.targetClient) {
return dialect.getHostRole(client.targetClient);
if (client) {
return await dialect.getHostRole(client);
} else {
throw new AwsWrapperError(Messages.get("AwsClient targetClient not defined."));
throw new AwsWrapperError(Messages.get("AwsClient.targetClientNotDefined"));
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { ClientWrapper } from "./client_wrapper";
import { logger } from "../logutils";
import { Messages } from "./utils/messages";
import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes";
import { getWriter } from "./utils/utils";
import { getWriter, logTopology } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConfigurationProfile } from "./profile/configuration_profile";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl

private async checkWriterChanged(): Promise<void> {
const hostInfoAfterFailover = this.getWriter(this.pluginService.getHosts());

if (this.currentWriter === null) {
this.currentWriter = hostInfoAfterFailover;
this.needUpdateCurrentWriter = false;
Expand All @@ -127,10 +126,12 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
async notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): Promise<void> {
for (const [key, _] of changes.entries()) {
const hostChanges = changes.get(key);

if (hostChanges) {
if (hostChanges.has(HostChangeOptions.PROMOTED_TO_READER)) {
await this.tracker.invalidateAllConnectionsMultipleHosts(key);
}

if (hostChanges.has(HostChangeOptions.PROMOTED_TO_WRITER)) {
this.needUpdateCurrentWriter = true;
}
Expand Down
31 changes: 19 additions & 12 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
private static readonly TELEMETRY_READER_FAILOVER = "failover to replica";
private static readonly METHOD_END = "end";

private static readonly subscribedMethods: Set<string> = new Set([
"initHostProvider",
"connect",
Expand Down Expand Up @@ -344,17 +345,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
} else {
await this.failoverReader(failedHost);
}

if (this._isInTransaction || this.pluginService.isInTransaction()) {
// "Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction."
logger.debug(Messages.get("Failover.transactionResolutionUnknownError"));
throw new TransactionResolutionUnknownError(Messages.get("Failover.transactionResolutionUnknownError"));
} else {
// "The active SQL connection has changed due to a connection failure. Please re-configure
// session state if required."
throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError"));
}
this.throwFailoverSuccessError();
}

async failoverReader(failedHostInfo: HostInfo) {
Expand Down Expand Up @@ -392,7 +383,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
await this.pluginService.setCurrentClient(result.client, result.newHost);
this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.updateTopology(true);
this.failoverReaderSuccessCounter.inc();
joyc-bq marked this conversation as resolved.
Show resolved Hide resolved
this.failoverWriterSuccessCounter.inc();
} catch (error: any) {
this.failoverReaderFailedCounter.inc();
throw error;
Expand All @@ -405,6 +396,21 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}
}

throwFailoverSuccessError() {
if (this._isInTransaction || this.pluginService.isInTransaction()) {
this.pluginService.setInTransaction(false);

// "Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction."
logger.debug(Messages.get("Failover.transactionResolutionUnknownError"));
throw new TransactionResolutionUnknownError(Messages.get("Failover.transactionResolutionUnknownError"));
} else {
// "The active SQL connection has changed due to a connection failure. Please re-configure
// session state if required."
throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError"));
}
}

async failoverWriter() {
logger.debug(Messages.get("Failover.startWriterFailover"));

Expand Down Expand Up @@ -439,6 +445,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
await this.pluginService.setCurrentClient(result.client, writerHostInfo);
logger.debug(Messages.get("Failover.establishedConnection", this.pluginService.getCurrentHostInfo()?.host ?? ""));
await this.pluginService.refreshHostList();
this.throwFailoverSuccessError();
this.failoverWriterSuccessCounter.inc();
joyc-bq marked this conversation as resolved.
Show resolved Hide resolved
} catch (error: any) {
this.failoverWriterFailedCounter.inc();
Expand Down
29 changes: 11 additions & 18 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { HostInfo } from "../../host_info";
import { PluginService } from "../../plugin_service";
import { ReaderFailoverResult } from "./reader_failover_result";
import { getTimeoutTask, maskProperties, shuffleList, sleep } from "../../utils/utils";
import { getTimeoutTask, logAndThrowError, maskProperties, shuffleList, sleep } from "../../utils/utils";
import { HostRole } from "../../host_role";
import { HostAvailability } from "../../host_availability/host_availability";
import { AwsWrapperError, InternalQueryTimeoutError } from "../../utils/errors";
Expand Down Expand Up @@ -110,26 +110,16 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
return result; // connection to any host is acceptable
}

// ensure new connection is to a reader host
// Ensure new connection is to a reader host
await this.pluginService.refreshHostList();
const topology = this.pluginService.getHosts();

for (let i = 0; i < topology.length; i++) {
const host = topology[i];
if (host.host === result.newHost.host) {
// found new connection host in the latest topology
if (host.role === HostRole.READER) {
return result;
}
try {
if ((await this.pluginService.getHostRole(result.client)) !== HostRole.READER) {
return result;
}
} catch (error) {
logger.debug(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", error.message));
}

// New host is not found in the latest topology. There are few possible reasons for that.
// - Host is not yet presented in the topology due to failover process in progress
// - Host is in the topology but its role isn't a
// READER (that is not acceptable option due to this.strictReader setting)
// Need to continue this loop and to make another try to connect to a reader.

try {
await this.pluginService.abortTargetClient(result.client);
} catch (error) {
Expand Down Expand Up @@ -276,7 +266,9 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler

const hostsByPriority: HostInfo[] = [...activeReaders];
const numReaders: number = activeReaders.length + downHostList.length;
if (writerHost && (!this.enableFailoverStrictReader || numReaders === 0)) {
// Since the writer instance may change during failover, the original writer is likely now a reader. We will include
// it and then verify the role once connected if using "strict-reader".
if (writerHost || numReaders === 0) {
hostsByPriority.push(writerHost);
}
hostsByPriority.push(...downHostList);
Expand Down Expand Up @@ -322,6 +314,7 @@ class ConnectionAttemptTask {
);
try {
this.targetClient = await this.pluginService.forceConnect(this.newHost, copy);

this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.AVAILABLE);
logger.info(Messages.get("ClusterAwareReaderFailoverHandler.successfulReaderConnection", this.newHost.host));
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) === -1) {
Expand Down
4 changes: 3 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"ClusterAwareWriterFailoverHandler.standaloneHost": "[TaskB] Host %s is not yet connected to a cluster. The cluster is still being reconfigured.",
"ClusterAwareWriterFailoverHandler.taskBAttemptConnectionToNewWriter": "[TaskB] Trying to connect to a new writer: '%s'",
"ClusterAwareWriterFailoverHandler.alreadyWriter": "Current reader connection is actually a new writer connection.",
"ClusterAwareReaderFailoverHandler.errorGettingHostRole": "An error occurred while trying to determine the role of the reader candidate: %s.",
"Failover.TransactionResolutionUnknownError": "Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction.",
"Failover.connectionChangedError": "The active SQL connection has changed due to a connection failure. Please re-configure session state if required.",
"Failover.parameterValue": "%s = %s",
Expand Down Expand Up @@ -193,5 +194,6 @@
"ConfigurationProfileBuilder.notFound": "Configuration profile '%s' not found.",
"ConfigurationProfileBuilder.profileNameRequired": "Profile name is required.",
"ConfigurationProfileBuilder.canNotUpdateKnownPreset": "Can't add or update a built-in preset configuration profile '%s'.",
"AwsClient.configurationProfileNotFound": "Configuration profile '%s' not found."
"AwsClient.configurationProfileNotFound": "Configuration profile '%s' not found.",
"AwsClient.targetClientNotDefined": "AwsClient targetClient not defined."
}
2 changes: 1 addition & 1 deletion pg/lib/dialect/aurora_pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class AuroraPgDatabaseDialect extends PgDatabaseDialect implements Topolo

async getHostRole(targetClient: ClientWrapper): Promise<HostRole> {
const res = await targetClient.query(AuroraPgDatabaseDialect.IS_READER_QUERY);
return Promise.resolve(res.rows[0]["is_reader"] === "true" ? HostRole.READER : HostRole.WRITER);
return Promise.resolve(res.rows[0]["is_reader"] === true ? HostRole.READER : HostRole.WRITER);
}

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/failover_plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import { AwsMySQLClient } from "../../mysql/lib";
import { anything, instance, mock, reset, resetCalls, spy, verify, when } from "ts-mockito";
import { Messages } from "../../common/lib/utils/messages";
import { HostChangeOptions } from "../../common/lib/host_change_options";
import { ClientWrapper } from "../../common/lib/client_wrapper";
import { NullTelemetryFactory } from "../../common/lib/utils/telemetry/null_telemetry_factory";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";

Expand Down Expand Up @@ -196,9 +195,10 @@ describe("reader failover handler", () => {
const hostInfo = builder.withHost("hostA").build();
const hosts = [hostInfo];

when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "aslias2"]));
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "alias2"]));
when(mockHostInfo.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE);
when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.WRITER);

when(mockReaderResult.isConnected).thenReturn(true);
when(mockReaderResult.client).thenReturn(mockClientWrapper);
Expand Down Expand Up @@ -230,7 +230,7 @@ describe("reader failover handler", () => {
const hosts = [hostInfo];
const test = new AwsWrapperError("test");

when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "aslias2"]));
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "alias2"]));
when(mockHostInfo.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE);
when(mockPluginService.getHosts()).thenReturn(hosts);
when(mockReaderResult.exception).thenReturn(test);
Expand All @@ -254,7 +254,7 @@ describe("reader failover handler", () => {
const hosts = [hostInfo];
const test = new AwsWrapperError("test");

when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "aslias2"]));
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "alias2"]));
when(mockPluginService.getHosts()).thenReturn(hosts);
when(mockWriterResult.exception).thenReturn(test);
when(mockWriterFailoverHandler.failover(anything())).thenResolve(instance(mockWriterResult));
Expand All @@ -275,7 +275,7 @@ describe("reader failover handler", () => {
const hostInfo = builder.withHost("hostA").build();
const hosts = [hostInfo];

when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "aslias2"]));
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "alias2"]));
when(mockPluginService.getHosts()).thenReturn(hosts);
when(mockWriterResult.isConnected).thenReturn(false);
when(mockWriterFailoverHandler.failover(anything())).thenResolve(instance(mockWriterResult));
Expand Down Expand Up @@ -305,7 +305,7 @@ describe("reader failover handler", () => {
const hostInfo = builder.withHost("hostA").build();
const hosts = [hostInfo];

when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "aslias2"]));
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["alias1", "alias2"]));
when(mockPluginService.getHosts()).thenReturn(hosts);
when(mockWriterResult.isConnected).thenReturn(false);
when(mockWriterResult.topology).thenReturn(hosts);
Expand Down
9 changes: 7 additions & 2 deletions tests/unit/reader_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ describe("reader failover handler", () => {
const successHostIndex = 3;

when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);

for (let i = 0; i < hosts.length; i++) {
if (i !== successHostIndex) {
when(mockPluginService.forceConnect(hosts[i], anything())).thenThrow(new AwsWrapperError("Rejecting test"));
Expand Down Expand Up @@ -326,6 +328,9 @@ describe("reader failover handler", () => {
const reader = new HostInfo("reader", 1234, HostRole.READER);
const hosts = [writer, reader];
when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything()))
.thenReturn(HostRole.READER)
.thenReturn(HostRole.WRITER);

const mockPluginServiceInstance = instance(mockPluginService);
const target = new ClusterAwareReaderFailoverHandler(
Expand All @@ -338,13 +343,13 @@ describe("reader failover handler", () => {

// We expect only reader nodes to be chosen.
let hostsByPriority = target.getHostsByPriority(hosts);
expect(hostsByPriority).toStrictEqual([reader]);
expect(hostsByPriority).toStrictEqual([reader, writer]);

// Should pick the reader even if unavailable.
reader.setAvailability(HostAvailability.NOT_AVAILABLE);

hostsByPriority = target.getHostsByPriority(hosts);
expect(hostsByPriority).toStrictEqual([reader]);
expect(hostsByPriority).toStrictEqual([writer, reader]);

// Writer node will only be picked if it is the only node in topology;
hostsByPriority = target.getHostsByPriority([writer]);
Expand Down
Loading