Skip to content

Commit

Permalink
feat: add error listeners for the error events
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq committed Nov 6, 2024
1 parent e7c31a3 commit b908985
Show file tree
Hide file tree
Showing 16 changed files with 654 additions and 2,891 deletions.
15 changes: 5 additions & 10 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { AwsWrapperError } from "./utils/errors";
import { WrapperProperties } from "./wrapper_property";
import { Messages } from "./utils/messages";
import { RdsUtils } from "./utils/rds_utils";
import { HostInfoBuilder } from "./host_info_builder";
import { promisify } from "util";
import { lookup } from "dns";
import { PluginService } from "./plugin_service";
Expand Down Expand Up @@ -51,17 +50,13 @@ export class DriverConnectionProvider implements ConnectionProvider {
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper> {
let resultTargetClient;
const resultProps = new Map(props);
let connectionHostInfo: HostInfo;

resultProps.set(WrapperProperties.HOST.name, hostInfo.host);
if (hostInfo.isPortSpecified()) {
resultProps.set(WrapperProperties.PORT.name, hostInfo.port);
}
const driverDialect: DriverDialect = pluginService.getDriverDialect();
try {
const targetClient: any = await driverDialect.connect(hostInfo, props);
connectionHostInfo = new HostInfoBuilder({
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
})
.copyFrom(hostInfo)
.build();
resultTargetClient = targetClient;
resultTargetClient = await driverDialect.connect(hostInfo, resultProps);
} catch (e: any) {
if (!WrapperProperties.ENABLE_GREEN_HOST_REPLACEMENT.get(props)) {
throw e;
Expand Down
28 changes: 28 additions & 0 deletions common/lib/error_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,35 @@
limitations under the License.
*/

import { ClientWrapper } from "./client_wrapper";

export interface ErrorHandler {
isLoginError(e: Error): boolean;

isNetworkError(e: Error): boolean;

/**
* Checks whether there has been an unexpected error emitted and if the error is a type of login error.
*/
hasLoginError(): boolean;

/**
* Checks whether there has been an unexpected error emitted and if the error is a type of network error.
*/
hasNetworkError(): boolean;

getUnexpectedError(): Error | null;

/**
* Attach an error event listener to the event emitter object in the ClientWrapper.
* The listener will track the latest error emitted to be handled in the future.
* @param clientWrapper a wrapper containing the target community client.
*/
attachErrorListener(clientWrapper: ClientWrapper | undefined): void;

/**
* Attach a No-Op error listener that ignores any error emitted.
* @param clientWrapper a wrapper containing the target community client.
*/
attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void;
}
38 changes: 30 additions & 8 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this._isInTransaction = inTransaction;
}

isLoginError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isLoginError(e);
}

isNetworkError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isNetworkError(e);
}

getHostListProvider(): HostListProvider | null {
return this._hostListProvider ? this._hostListProvider : null;
}
Expand Down Expand Up @@ -479,4 +471,34 @@ export class PluginService implements ErrorHandler, HostListProviderService {
getTelemetryFactory(): TelemetryFactory {
return this.pluginServiceManagerContainer.pluginManager!.getTelemetryFactory();
}

/* Error Handler interface implementation */

isLoginError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isLoginError(e);
}

isNetworkError(e: Error): boolean {
return this.getCurrentClient().errorHandler.isNetworkError(e);
}

hasLoginError(): boolean {
return this.getCurrentClient().errorHandler.hasLoginError();
}

hasNetworkError(): boolean {
return this.getCurrentClient().errorHandler.hasNetworkError();
}

getUnexpectedError(): Error | null {
return this.getCurrentClient().errorHandler.getUnexpectedError();
}

attachErrorListener(clientWrapper: ClientWrapper | undefined): void {
this.getCurrentClient().errorHandler.attachErrorListener(clientWrapper);
}

attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void {
this.getCurrentClient().errorHandler.attachNoOpErrorListener(clientWrapper);
}
}
8 changes: 7 additions & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this.failoverMode = this._rdsUrlType === RdsUrlType.RDS_READER_CLUSTER ? FailoverMode.READER_OR_WRITER : FailoverMode.STRICT_WRITER;
}

logger.debug(Messages.get("Failover.parameterValue", "failoverMode", this.failoverMode.toString()));
logger.debug(Messages.get("Failover.parameterValue", "failoverMode", FailoverMode[this.failoverMode]));
}

override notifyConnectionChanged(changes: Set<HostChangeOptions>): Promise<OldConnectionSuggestionAction> {
Expand Down Expand Up @@ -294,6 +294,12 @@ export class FailoverPlugin extends AbstractConnectionPlugin {

override async execute<T>(methodName: string, methodFunc: () => Promise<T>): Promise<T> {
try {
// Verify there aren't any unexpected error emitted while the connection was idle.
if (this.pluginService.hasNetworkError()) {
// Throw the unexpected error directly to be handled.
throw this.pluginService.getUnexpectedError();
}

if (!this.enableFailoverSetting || this.canDirectExecute(methodName)) {
return await methodFunc();
}
Expand Down
4 changes: 3 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,7 @@
"DefaultTelemetryFactory.importFailure": "A tracing backend could not be found.",
"DefaultTelemetryFactory.missingTracingBackend": "A tracing backend could not be found.",
"DefaultTelemetryFactory.missingMetricsBackend": "A metrics backend could not be found.",
"InternalPooledConnectionProvider.pooledConnectionFailed": "Internal pooled connection failed with message: '%s'"
"InternalPooledConnectionProvider.pooledConnectionFailed": "Internal pooled connection failed with message: '%s'",
"ErrorHandler.NoOpListener": "[%s] NoOp error event listener caught error: '%s'",
"ErrorHandler.TrackerListener": "[%s] Tracker error event listener caught error: '%s'"
}
4 changes: 1 addition & 3 deletions common/lib/utils/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

import path from "path";
import { I18n } from "i18n";
import { fileURLToPath } from "url";

export class Messages {
static __filename = fileURLToPath(import.meta.url);
static __dirname = path.dirname(Messages.__filename);
static __dirname = path.dirname(__filename);

static i18n = new I18n({
locales: ["en"],
Expand Down
5 changes: 4 additions & 1 deletion jest.integration.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
"^.+\\.ts$": [
"ts-jest",
{
"diagnostics": false
"diagnostics": false,
"useESM": true
}
]
},
"transformIgnorePatterns": ["node_modules"],
"moduleNameMapper": {
"^uuid$": "uuid"
},
"globalSetup": "<rootDir>/tests/integration/container/tests/setup.ts",
"globalTeardown": "<rootDir>/tests/integration/container/tests/teardown.ts",
"setupFilesAfterEnv": ["<rootDir>/tests/integration/container/tests/config.ts"],
"testEnvironment": "node",
"extensionsToTreatAsEsm": [".ts"],
"reporters": ["default", "./node_modules/jest-html-reporter"]
}
68 changes: 39 additions & 29 deletions mysql/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,39 +206,49 @@ export class AwsMySQLClient extends AwsClient {
}

async end() {
if (!this.isConnected || !this.targetClient) {
// No connections have been initialized.
// This might happen if end is called in a finally block when an error occurred while initializing the first connection.
return;
}
this.errorHandler.attachNoOpErrorListener(this.targetClient);
try {
if (!this.isConnected || !this.targetClient) {
// No connections have been initialized.
// This might happen if end is called in a finally block when an error occurred while initializing the first connection.
return;
}

const result = await this.pluginManager.execute(
this.pluginService.getCurrentHostInfo(),
this.properties,
"end",
() => {
return ClientUtils.queryWithTimeout(this.targetClient!.end(), this.properties);
},
null
);
await this.releaseResources();
return result;
const result = await this.pluginManager.execute(
this.pluginService.getCurrentHostInfo(),
this.properties,
"end",
() => {
return ClientUtils.queryWithTimeout(this.targetClient!.end(), this.properties);
},
null
);
await this.releaseResources();
return result;
} finally {
this.errorHandler.attachErrorListener(this.targetClient);
}
}

async rollback(): Promise<any> {
return this.pluginManager.execute(
this.pluginService.getCurrentHostInfo(),
this.properties,
"rollback",
async () => {
if (this.targetClient) {
this.pluginService.updateInTransaction("rollback");
return await this.targetClient.rollback();
}
return null;
},
null
);
this.errorHandler.attachNoOpErrorListener(this.targetClient);
try {
return this.pluginManager.execute(
this.pluginService.getCurrentHostInfo(),
this.properties,
"rollback",
async () => {
if (this.targetClient) {
this.pluginService.updateInTransaction("rollback");
return await this.targetClient.rollback();
}
return null;
},
null
);
} finally {
this.errorHandler.attachErrorListener(this.targetClient);
}
}

resetState() {
Expand Down
44 changes: 43 additions & 1 deletion mysql/lib/mysql_error_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@

import { ErrorHandler } from "../../common/lib/error_handler";
import { Messages } from "../../common/lib/utils/messages";
import { logger } from "../../common/logutils";
import { ClientWrapper } from "../../common/lib/client_wrapper";

export class MySQLErrorHandler implements ErrorHandler {
private static readonly SQLSTATE_ACCESS_ERROR = "28000";

private unexpectedError: Error | null = null;

protected noOpListener(error: any) {
// Ignore the received error.
logger.silly(Messages.get("ErrorHandler.NoOpListener", "MySQLErrorHandler", error.message));
}

protected trackingListener(error: any) {
this.unexpectedError = error;
logger.silly(Messages.get("ErrorHandler.TrackerListener", "MySQLErrorHandler", error.message));
}

isLoginError(e: Error): boolean {
if (Object.prototype.hasOwnProperty.call(e, "sqlState")) {
// @ts-ignore
Expand All @@ -38,4 +51,33 @@ export class MySQLErrorHandler implements ErrorHandler {
e.message.includes("connect ETIMEDOUT")
);
}

hasLoginError(): boolean {
return this.unexpectedError !== null && this.isLoginError(this.unexpectedError);
}

hasNetworkError(): boolean {
return this.unexpectedError !== null && this.isNetworkError(this.unexpectedError);
}

getUnexpectedError(): Error | null {
return this.unexpectedError;
}

attachErrorListener(clientWrapper: ClientWrapper | undefined): void {
if (!clientWrapper || !clientWrapper.client) {
return;
}
this.unexpectedError = null;
clientWrapper.client.removeListener("error", this.noOpListener);
clientWrapper.client.on("error", this.trackingListener);
}

attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void {
if (!clientWrapper || !clientWrapper.client) {
return;
}
clientWrapper.client.removeListener("error", this.trackingListener);
clientWrapper.client.on("error", this.noOpListener);
}
}
Loading

0 comments on commit b908985

Please sign in to comment.