Skip to content

Commit

Permalink
fix: integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq committed Jan 2, 2025
1 parent a64f3d7 commit 6eb34c7
Show file tree
Hide file tree
Showing 23 changed files with 208 additions and 263 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- refactor/update-read-only
paths-ignore:
- "**/*.md"
- "**/*.jpg"
Expand Down
2 changes: 0 additions & 2 deletions common/lib/client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
*/

import { HostInfo } from "./host_info";
import { SessionState } from "./session_state";

export interface ClientWrapper {
readonly client: any;
readonly hostInfo: HostInfo;
readonly properties: Map<string, any>;
readonly id: string;
readonly sessionState: SessionState;

query(sql: any): Promise<any>;

Expand Down
1 change: 0 additions & 1 deletion common/lib/mysql_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export class MySQLClientWrapper implements ClientWrapper {
readonly hostInfo: HostInfo;
readonly properties: Map<string, string>;
readonly id: string;
readonly sessionState: SessionState = new SessionState();

/**
* Creates a wrapper for the target community driver client.
Expand Down
2 changes: 0 additions & 2 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
async abortCurrentClient(): Promise<void> {
if (this._currentClient.targetClient) {
await this._currentClient.targetClient.abort();
// this.setInTransaction(false);
// this.getSessionStateService().reset();
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader"));
}

this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.pluginService.abortCurrentClient();
await this.pluginService.setCurrentClient(result.client, result.newHost);
this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.updateTopology(true);
this.failoverReaderSuccessCounter.inc();
} catch (error: any) {
Expand Down
6 changes: 4 additions & 2 deletions common/lib/session_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ export abstract class SessionStateField<Type> {
pristineValue?: Type;

constructor(copy?: SessionStateField<Type>) {
this.value = copy.value;
this.pristineValue = copy.pristineValue;
if (copy) {
this.value = copy.value;
this.pristineValue = copy.pristineValue;
}
}

abstract setValue(state: SessionState): void;
Expand Down
50 changes: 11 additions & 39 deletions common/lib/session_state_service_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ export class SessionStateServiceImpl implements SessionStateService {
const targetClient: ClientWrapper = newClient.targetClient;

// Apply current state for all 5 states: autoCommit, readOnly, catalog, schema, transactionIsolation
for (const key of Object.keys(this.copySessionState)) {
const state = this.copySessionState[key];
if (state.constructor === SessionStateField) {
for (const key of Object.keys(this.sessionState)) {
const state = this.sessionState[key];
if (state instanceof SessionStateField) {
await this.applyCurrentState(targetClient, state);
}
}
Expand Down Expand Up @@ -93,7 +93,7 @@ export class SessionStateServiceImpl implements SessionStateService {
// The states that will be set are: autoCommit, readonly, schema, catalog, transactionIsolation.
for (const key of Object.keys(this.copySessionState)) {
const state = this.copySessionState[key];
if (state.constructor === SessionStateField) {
if (state instanceof SessionStateField) {
await this.setPristineStateOnTarget(targetClient, state, key);
}
}
Expand All @@ -104,12 +104,7 @@ export class SessionStateServiceImpl implements SessionStateService {
}

setAutoCommit(autoCommit: boolean): void {
if (!this.transferStateEnabledSetting()) {
return;
}

this.sessionState.autoCommit.value = autoCommit;
this.logCurrentState();
return this.setState("autoCommit", autoCommit);
}

setupPristineAutoCommit(): void;
Expand All @@ -123,12 +118,7 @@ export class SessionStateServiceImpl implements SessionStateService {
}

setCatalog(catalog: string): void {
if (!this.transferStateEnabledSetting()) {
return;
}

this.sessionState.catalog.value = catalog;
this.logCurrentState();
return this.setState("catalog", catalog);
}

setupPristineCatalog(): void;
Expand All @@ -142,12 +132,7 @@ export class SessionStateServiceImpl implements SessionStateService {
}

setReadOnly(readOnly: boolean): void {
if (!this.transferStateEnabledSetting()) {
return;
}

this.sessionState.readOnly.value = readOnly;
this.logCurrentState();
return this.setState("readOnly", readOnly);
}

setupPristineReadOnly(): void;
Expand All @@ -157,24 +142,16 @@ export class SessionStateServiceImpl implements SessionStateService {
}

updateReadOnly(readOnly: boolean): void {
// TODO: review this
// this.pluginService.getSessionStateService().setupPristineReadOnly(readOnly);
// this.pluginService.getSessionStateService().setReadOnly(readOnly);
this.setupPristineState(this.sessionState.readOnly, readOnly);
this.setState(this.sessionState.readOnly, readOnly);
this.pluginService.getSessionStateService().setupPristineReadOnly(readOnly);
this.pluginService.getSessionStateService().setReadOnly(readOnly);
}

getSchema(): string | undefined {
return this.sessionState.schema.value;
}

setSchema(schema: string): void {
if (!this.transferStateEnabledSetting()) {
return;
}

this.sessionState.schema.value = schema;
this.logCurrentState();
return this.setState("schema", schema);
}

setupPristineSchema(): void;
Expand All @@ -188,12 +165,7 @@ export class SessionStateServiceImpl implements SessionStateService {
}

setTransactionIsolation(transactionIsolation: number): void {
if (!this.transferStateEnabledSetting()) {
return;
}

this.sessionState.transactionIsolation.value = transactionIsolation;
this.logCurrentState();
return this.setState("transactionIsolation", transactionIsolation);
}

setupPristineTransactionIsolation(): void;
Expand Down
8 changes: 4 additions & 4 deletions common/lib/utils/sql_method_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class SqlMethodUtils {
}

static doesSetAutoCommit(statements: string[], dialect: DatabaseDialect): boolean | undefined {
let autoCommit;
let autoCommit = undefined;
for (const statement of statements) {
const cleanStatement = statement
.toLowerCase()
Expand All @@ -60,7 +60,7 @@ export class SqlMethodUtils {
}

static doesSetCatalog(statements: string[], dialect: DatabaseDialect): string | undefined {
let catalog;
let catalog = undefined;
for (const statement of statements) {
const cleanStatement = statement
.toLowerCase()
Expand All @@ -73,7 +73,7 @@ export class SqlMethodUtils {
}

static doesSetSchema(statements: string[], dialect: DatabaseDialect): string | undefined {
let schema;
let schema = undefined;
for (const statement of statements) {
const cleanStatement = statement
.toLowerCase()
Expand All @@ -86,7 +86,7 @@ export class SqlMethodUtils {
}

static doesSetTransactionIsolation(statements: string[], dialect: DatabaseDialect): TransactionIsolationLevel | undefined {
let transactionIsolation;
let transactionIsolation = undefined;
for (const statement of statements) {
const cleanStatement = statement
.toLowerCase()
Expand Down
12 changes: 4 additions & 8 deletions mysql/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,12 @@ export class AwsMySQLClient extends AwsClient {
async setReadOnly(readOnly: boolean): Promise<Query | void> {
this.pluginService.getSessionStateService().setupPristineReadOnly();
const result = await this.queryWithoutUpdate({ sql: `SET SESSION TRANSACTION READ ${readOnly ? "ONLY" : "WRITE"}` });
this.targetClient.sessionState.readOnly.value = readOnly;
this.pluginService.getSessionStateService().updateReadOnly(readOnly);
return result;
}

isReadOnly(): boolean {
return this.targetClient.sessionState.readOnly.value;
return this.pluginService.getSessionStateService().getReadOnly();
}

async setAutoCommit(autoCommit: boolean): Promise<Query | void> {
Expand All @@ -113,24 +112,22 @@ export class AwsMySQLClient extends AwsClient {
setting = "0";
}
const result = await this.queryWithoutUpdate({ sql: `SET AUTOCOMMIT=${setting}` });
this.targetClient.sessionState.autoCommit.value = autoCommit;
this.pluginService.getSessionStateService().setAutoCommit(autoCommit);
return result;
}

getAutoCommit(): boolean {
return this.targetClient.sessionState.autoCommit.value;
return this.pluginService.getSessionStateService().getAutoCommit();
}

async setCatalog(catalog: string): Promise<Query | void> {
this.pluginService.getSessionStateService().setupPristineCatalog();
await this.queryWithoutUpdate({ sql: `USE ${catalog}` });
this.targetClient.sessionState.catalog.value = catalog;
this.pluginService.getSessionStateService().setCatalog(catalog);
}

getCatalog(): string {
return this.targetClient.sessionState.catalog.value;
return this.pluginService.getSessionStateService().getCatalog();
}

async setSchema(schema: string): Promise<Query | void> {
Expand Down Expand Up @@ -161,12 +158,11 @@ export class AwsMySQLClient extends AwsClient {
throw new AwsWrapperError(Messages.get("Client.invalidTransactionIsolationLevel", String(level)));
}

this.targetClient.sessionState.transactionIsolation.value = level;
this.pluginService.getSessionStateService().setTransactionIsolation(level);
}

getTransactionIsolation(): number {
return this.targetClient.sessionState.transactionIsolation.value;
return this.pluginService.getSessionStateService().getTransactionIsolation();
}

async end() {
Expand Down
6 changes: 2 additions & 4 deletions mysql/lib/dialect/mysql_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import { DatabaseDialect, DatabaseType } from "../../../common/lib/database_dialect/database_dialect";
import { HostListProviderService } from "../../../common/lib/host_list_provider_service";
import { HostListProvider } from "../../../common/lib/host_list_provider/host_list_provider";
import {
ConnectionStringHostListProvider
} from "../../../common/lib/host_list_provider/connection_string_host_list_provider";
import { ConnectionStringHostListProvider } from "../../../common/lib/host_list_provider/connection_string_host_list_provider";
import { AwsWrapperError, UnsupportedMethodError } from "../../../common/lib/utils/errors";
import { DatabaseDialectCodes } from "../../../common/lib/database_dialect/database_dialect_codes";
import { TransactionIsolationLevel } from "../../../common/lib/utils/transaction_isolation_level";
Expand All @@ -34,7 +32,7 @@ import { Messages } from "../../../common/lib/utils/messages";
export class MySQLDatabaseDialect implements DatabaseDialect {
protected dialectName: string = this.constructor.name;
protected defaultPort: number = 3306;

getDefaultPort(): number {
return this.defaultPort;
}
Expand Down
11 changes: 4 additions & 7 deletions pg/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,12 @@ export class AwsPGClient extends AwsClient {
async setReadOnly(readOnly: boolean): Promise<QueryResult | void> {
this.pluginService.getSessionStateService().setupPristineReadOnly();
const result = await this.queryWithoutUpdate(`SET SESSION CHARACTERISTICS AS TRANSACTION READ ${readOnly ? "ONLY" : "WRITE"}`);
this.targetClient.sessionState.readOnly.value = readOnly;
this.pluginService.getSessionStateService().updateReadOnly(readOnly);
return result;
}

isReadOnly(): boolean {
return this.targetClient.sessionState.readOnly.value;
return this.pluginService.getSessionStateService().getReadOnly();
}

async setAutoCommit(autoCommit: boolean): Promise<QueryResult | void> {
Expand All @@ -113,7 +112,7 @@ export class AwsPGClient extends AwsClient {

switch (level) {
case 0:
await this.queryWithoutUpdate("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
await this.queryWithoutUpdate("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
break;
case 1:
await this.queryWithoutUpdate("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
Expand All @@ -127,12 +126,11 @@ export class AwsPGClient extends AwsClient {
default:
throw new AwsWrapperError(Messages.get("Client.invalidTransactionIsolationLevel", String(level)));
}
this.targetClient.sessionState.transactionIsolation.value = level;
this.pluginService.getSessionStateService().setTransactionIsolation(level);
}

getTransactionIsolation(): number {
return this.targetClient.sessionState.transactionIsolation.value;
return this.pluginService.getSessionStateService().getTransactionIsolation();
}

async setCatalog(catalog: string): Promise<void> {
Expand All @@ -150,13 +148,12 @@ export class AwsPGClient extends AwsClient {

this.pluginService.getSessionStateService().setupPristineSchema();
const result = await this.queryWithoutUpdate(`SET search_path TO ${schema};`);
this.targetClient.sessionState.schema.value = schema;
this.pluginService.getSessionStateService().setSchema(schema);
return result;
}

getSchema(): string {
return this.targetClient.sessionState.schema.value;
return this.pluginService.getSessionStateService().getSchema();
}

async end() {
Expand Down
Loading

0 comments on commit 6eb34c7

Please sign in to comment.