Skip to content

Commit

Permalink
refactor: session state service implementation (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill authored Jan 3, 2025
1 parent 4039c5a commit b9eedde
Show file tree
Hide file tree
Showing 27 changed files with 686 additions and 401 deletions.
11 changes: 3 additions & 8 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ import { DriverConfigurationProfiles } from "./profile/driver_configuration_prof
import { ConfigurationProfile } from "./profile/configuration_profile";
import { AwsWrapperError } from "./utils/errors";
import { Messages } from "./utils/messages";
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";

export abstract class AwsClient extends EventEmitter {
private _defaultPort: number = -1;
protected telemetryFactory: TelemetryFactory;
protected pluginManager: PluginManager;
protected pluginService: PluginService;
protected isConnected: boolean = false;
protected _isReadOnly: boolean = false;
protected _isolationLevel: number = 0;
protected _connectionUrlParser: ConnectionUrlParser;
protected _configurationProfile: ConfigurationProfile | null = null;
readonly properties: Map<string, any>;
Expand Down Expand Up @@ -151,8 +150,6 @@ export abstract class AwsClient extends EventEmitter {
return this._connectionUrlParser;
}

abstract updateSessionStateReadOnly(readOnly: boolean): Promise<any | void>;

abstract setReadOnly(readOnly: boolean): Promise<any | void>;

abstract isReadOnly(): boolean;
Expand All @@ -161,9 +158,9 @@ export abstract class AwsClient extends EventEmitter {

abstract getAutoCommit(): boolean;

abstract setTransactionIsolation(transactionIsolation: number): Promise<any | void>;
abstract setTransactionIsolation(level: TransactionIsolationLevel): Promise<any | void>;

abstract getTransactionIsolation(): number;
abstract getTransactionIsolation(): TransactionIsolationLevel;

abstract setSchema(schema: any): Promise<any | void>;

Expand All @@ -179,8 +176,6 @@ export abstract class AwsClient extends EventEmitter {

abstract rollback(): Promise<any>;

abstract resetState(): void;

async isValid(): Promise<boolean> {
if (!this.targetClient) {
return Promise.resolve(false);
Expand Down
9 changes: 8 additions & 1 deletion common/lib/database_dialect/database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { HostListProviderService } from "../host_list_provider_service";
import { ClientWrapper } from "../client_wrapper";
import { FailoverRestriction } from "../plugins/failover/failover_restriction";
import { ErrorHandler } from "../error_handler";
import { SessionState } from "../session_state";
import { TransactionIsolationLevel } from "../utils/transaction_isolation_level";

export enum DatabaseType {
MYSQL,
Expand All @@ -30,6 +32,11 @@ export interface DatabaseDialect {
getHostAliasQuery(): string;
getHostAliasAndParseResults(targetClient: ClientWrapper): Promise<string>;
getServerVersionQuery(): string;
getSetReadOnlyQuery(readOnly: boolean): string;
getSetAutoCommitQuery(autoCommit: boolean): string;
getSetTransactionIsolationQuery(level: TransactionIsolationLevel): string;
getSetCatalogQuery(catalog: string): string;
getSetSchemaQuery(schema: string): string;
getDialectUpdateCandidates(): string[];
getErrorHandler(): ErrorHandler;
isDialect(targetClient: ClientWrapper): Promise<boolean>;
Expand All @@ -39,7 +46,7 @@ export interface DatabaseDialect {
getDialectName(): string;
getFailoverRestrictions(): FailoverRestriction[];
doesStatementSetReadOnly(statement: string): boolean | undefined;
doesStatementSetTransactionIsolation(statement: string): number | undefined;
doesStatementSetTransactionIsolation(statement: string): TransactionIsolationLevel | undefined;
doesStatementSetAutoCommit(statement: string): boolean | undefined;
doesStatementSetSchema(statement: string): string | undefined;
doesStatementSetCatalog(statement: string): string | undefined;
Expand Down
14 changes: 7 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConfigurationProfile } from "./profile/configuration_profile";
import { SessionState } from "./session_state";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand Down Expand Up @@ -75,10 +76,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.dbDialectProvider = new DatabaseDialectManager(knownDialectsByCode, dbType, this.props);
this.driverDialect = driverDialect;
this.initialHost = props.get(WrapperProperties.HOST.name);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
container.pluginService = this;

this.dialect = WrapperProperties.CUSTOM_DATABASE_DIALECT.get(this.props) ?? this.dbDialectProvider.getDialect(this.props);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
}

isInTransaction(): boolean {
Expand Down Expand Up @@ -333,7 +334,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.sessionStateService.begin();

try {
this.getCurrentClient().resetState();
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
await this.sessionStateService.applyCurrentSessionState(this.getCurrentClient());
Expand Down Expand Up @@ -432,35 +432,35 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private async updateReadOnly(statements: string[]) {
const updateReadOnly = SqlMethodUtils.doesSetReadOnly(statements, this.getDialect());
if (updateReadOnly !== undefined) {
await this.getCurrentClient().setReadOnly(updateReadOnly);
this.getSessionStateService().setReadOnly(updateReadOnly);
}
}

private async updateAutoCommit(statements: string[]) {
const updateAutoCommit = SqlMethodUtils.doesSetAutoCommit(statements, this.getDialect());
if (updateAutoCommit !== undefined) {
await this.getCurrentClient().setAutoCommit(updateAutoCommit);
this.getSessionStateService().setAutoCommit(updateAutoCommit);
}
}

private async updateCatalog(statements: string[]) {
const updateCatalog = SqlMethodUtils.doesSetCatalog(statements, this.getDialect());
if (updateCatalog !== undefined) {
await this.getCurrentClient().setCatalog(updateCatalog);
this.getSessionStateService().setCatalog(updateCatalog);
}
}

private async updateSchema(statements: string[]) {
const updateSchema = SqlMethodUtils.doesSetSchema(statements, this.getDialect());
if (updateSchema !== undefined) {
await this.getCurrentClient().setSchema(updateSchema);
this.getSessionStateService().setSchema(updateSchema);
}
}

private async updateTransactionIsolation(statements: string[]) {
const updateTransactionIsolation = SqlMethodUtils.doesSetTransactionIsolation(statements, this.getDialect());
if (updateTransactionIsolation !== undefined) {
await this.getCurrentClient().setTransactionIsolation(updateTransactionIsolation);
this.getSessionStateService().setTransactionIsolation(updateTransactionIsolation);
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader"));
}

this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.pluginService.abortCurrentClient();
await this.pluginService.setCurrentClient(result.client, result.newHost);
this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.updateTopology(true);
this.failoverReaderSuccessCounter.inc();
} catch (error: any) {
Expand Down
2 changes: 2 additions & 0 deletions common/lib/pool_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import { ClientWrapper } from "./client_wrapper";
import { HostInfo } from "./host_info";
import { uniqueId } from "../logutils";
import { ClientUtils } from "./utils/client_utils";
import { SessionState } from "./session_state";

export class PoolClientWrapper implements ClientWrapper {
readonly client: any;
readonly hostInfo: HostInfo;
readonly properties: Map<string, string>;
readonly id: string;
readonly sessionState = new SessionState();

constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, any>) {
this.client = targetClient;
Expand Down
148 changes: 126 additions & 22 deletions common/lib/session_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,29 @@
limitations under the License.
*/

class SessionStateField<Type> {
import { DatabaseDialect } from "./database_dialect/database_dialect";
import { AwsClient } from "./aws_client";
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";

export abstract class SessionStateField<Type> {
value?: Type;
pristineValue?: Type;

constructor(copy?: SessionStateField<Type>) {
if (copy) {
this.value = copy.value;
this.pristineValue = copy.pristineValue;
}
}

abstract setValue(state: SessionState): void;

abstract setPristineValue(state: SessionState): void;

abstract getQuery(dialect: DatabaseDialect, isPristine: boolean): string;

abstract getClientValue(client: AwsClient): Type;

resetValue(): void {
this.value = undefined;
}
Expand Down Expand Up @@ -60,38 +79,123 @@ class SessionStateField<Type> {
return true;
}

copy(): SessionStateField<Type> {
const newField: SessionStateField<Type> = new SessionStateField();
if (this.value !== undefined) {
newField.value = this.value;
}
toString() {
return `${this.pristineValue ?? "(blank)"} => ${this.value ?? "(blank)"}`;
}
}

if (this.pristineValue !== undefined) {
newField.pristineValue = this.pristineValue;
}
class AutoCommitState extends SessionStateField<boolean> {
setValue(state: SessionState) {
this.value = state.autoCommit.value;
}

return newField;
setPristineValue(state: SessionState) {
this.value = state.autoCommit.pristineValue;
}

toString() {
return `${this.pristineValue ?? "(blank)"} => ${this.value ?? "(blank)"}`;
getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetAutoCommitQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): boolean {
return client.getAutoCommit();
}
}

class ReadOnlyState extends SessionStateField<boolean> {
setValue(state: SessionState) {
this.value = state.readOnly.value;
}

setPristineValue(state: SessionState) {
this.value = state.readOnly.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetReadOnlyQuery(this.value);
}

getClientValue(client: AwsClient): boolean {
return client.isReadOnly();
}
}

class CatalogState extends SessionStateField<string> {
setValue(state: SessionState) {
this.value = state.catalog.value;
}

setPristineValue(state: SessionState) {
this.value = state.catalog.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetCatalogQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): string {
return client.getCatalog();
}
}

class SchemaState extends SessionStateField<string> {
setValue(state: SessionState) {
this.value = state.schema.value;
}

setPristineValue(state: SessionState) {
this.value = state.schema.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetSchemaQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): string {
return client.getSchema();
}
}

class TransactionIsolationState extends SessionStateField<TransactionIsolationLevel> {
setValue(state: SessionState) {
this.value = state.transactionIsolation.value;
}

setPristineValue(state: SessionState) {
this.value = state.transactionIsolation.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetTransactionIsolationQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): number {
return client.getTransactionIsolation();
}
}

export class SessionState {
autoCommit: SessionStateField<boolean> = new SessionStateField<boolean>();
readOnly: SessionStateField<boolean> = new SessionStateField<boolean>();
catalog: SessionStateField<string> = new SessionStateField<string>();
schema: SessionStateField<string> = new SessionStateField<string>();
transactionIsolation: SessionStateField<number> = new SessionStateField<number>();
autoCommit: AutoCommitState = new AutoCommitState();
readOnly: ReadOnlyState = new ReadOnlyState();
catalog: CatalogState = new CatalogState();
schema: SchemaState = new SchemaState();
transactionIsolation: TransactionIsolationState = new TransactionIsolationState();

static setState(target: SessionStateField<any>, source: SessionState): void {
target.setValue(source);
}

static setPristineState(target: SessionStateField<any>, source: SessionState): void {
target.setPristineValue(source);
}

copy(): SessionState {
const newSessionState = new SessionState();
newSessionState.autoCommit = this.autoCommit.copy();
newSessionState.readOnly = this.readOnly.copy();
newSessionState.catalog = this.catalog.copy();
newSessionState.schema = this.schema.copy();
newSessionState.transactionIsolation = this.transactionIsolation.copy();
newSessionState.autoCommit = new AutoCommitState(this.autoCommit);
newSessionState.readOnly = new ReadOnlyState(this.readOnly);
newSessionState.catalog = new CatalogState(this.catalog);
newSessionState.schema = new SchemaState(this.schema);
newSessionState.transactionIsolation = new TransactionIsolationState(this.transactionIsolation);

return newSessionState;
}
Expand Down
22 changes: 12 additions & 10 deletions common/lib/session_state_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import { AwsClient } from "./aws_client";
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";

export interface SessionStateService {
// auto commit
Expand All @@ -26,26 +27,27 @@ export interface SessionStateService {
// read only
getReadOnly(): boolean | undefined;
setReadOnly(readOnly: boolean): void;
setupPristineReadOnly(): boolean | undefined;
setupPristineReadOnly(readOnly: boolean): boolean | undefined;
setupPristineReadOnly(): void;
setupPristineReadOnly(readOnly: boolean): void;
updateReadOnly(readOnly: boolean): void;

// catalog
getCatalog(): string | undefined;
setCatalog(catalog: string): void;
setupPristineCatalog(): string | undefined;
setupPristineCatalog(catalog: string): string | undefined;
setupPristineCatalog(): void;
setupPristineCatalog(catalog: string): void;

// schema
getSchema(): string | undefined;
setSchema(schema: string): void;
setupPristineSchema(): string | undefined;
setupPristineSchema(schema: string): string | undefined;
setupPristineSchema(): void;
setupPristineSchema(schema: string): void;

// transaction isolation
getTransactionIsolation(): number | undefined;
setTransactionIsolation(transactionIsolation: number): void;
setupPristineTransactionIsolation(): number | undefined;
setupPristineTransactionIsolation(transactionIsolation: number): number | undefined;
getTransactionIsolation(): TransactionIsolationLevel | undefined;
setTransactionIsolation(transactionIsolation: TransactionIsolationLevel): void;
setupPristineTransactionIsolation(): void;
setupPristineTransactionIsolation(transactionIsolation: TransactionIsolationLevel): void;

reset(): void;

Expand Down
Loading

0 comments on commit b9eedde

Please sign in to comment.