Skip to content

Commit

Permalink
monitoring impl fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq committed Jan 20, 2025
1 parent df380fc commit e4dcaeb
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { CanReleaseResources } from "../../can_release_resources";
import { SubscribedMethodHelper } from "../../utils/subscribed_method_helper";
import { ClientWrapper } from "../../client_wrapper";

export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
export class HostMonitoring2ConnectionPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
id: string = uniqueId("_efm2Plugin");
private readonly properties: Map<string, any>;
private pluginService: PluginService;
Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ export class HostMonitoring2PluginFactory extends ConnectionPluginFactory {
async getInstance(pluginService: PluginService, properties: Map<string, any>): Promise<ConnectionPlugin> {
try {
if (!HostMonitoring2PluginFactory.hostMonitoring2Plugin) {
HostMonitoring2PluginFactory.hostMonitoring2Plugin = await import("./host_monitoring_connection_plugin");
HostMonitoring2PluginFactory.hostMonitoring2Plugin = await import("./host_monitoring2_connection_plugin");
}
return new HostMonitoring2PluginFactory.hostMonitoring2Plugin.HostMonitoringConnectionPlugin(pluginService, properties, new RdsUtils());
return new HostMonitoring2PluginFactory.hostMonitoring2Plugin.HostMonitoring2ConnectionPlugin(pluginService, properties, new RdsUtils());
} catch (error: any) {
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "HostMonitoringPlugin"));
}
Expand Down
19 changes: 12 additions & 7 deletions common/lib/plugins/efm2/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostAvailability } from "../../host_availability/host_availability";
import { MapUtils } from "../../utils/map_utils";
import { AwsWrapperError, InternalQueryTimeoutError } from "../../utils/errors";

export interface Monitor {
startMonitoring(context: MonitorConnectionContext): void;
Expand Down Expand Up @@ -83,11 +84,14 @@ export class MonitorImpl implements Monitor {

this.telemetryFactory.createGauge(`efm2.activeContexts.size.${hostId}`, () => this.activeContexts.length == Number.MAX_SAFE_INTEGER);

this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => !!this.hostUnhealthy);
this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => (this.hostUnhealthy ? 0 : 1));
const task1 = this.newContextRun();
const task2 = this.run();
Promise.race([task1, task2]).then(() => {
console.log("both tasks done");
Promise.all([task1, task2]).catch((error: any) => {
if (error instanceof InternalQueryTimeoutError) {
throw error;
}
throw new AwsWrapperError(error);
});
}

Expand All @@ -114,7 +118,7 @@ export class MonitorImpl implements Monitor {
startMonitorTimeNano,
(k) => new Array<WeakRef<MonitorConnectionContext>>()
);
connectionQueue!.push(new WeakRef<MonitorConnectionContext>(context));
connectionQueue.push(new WeakRef<MonitorConnectionContext>(context));
}

async newContextRun(): Promise<void> {
Expand Down Expand Up @@ -150,6 +154,7 @@ export class MonitorImpl implements Monitor {
} catch (err) {
// do nothing, exit task
}
logger.debug(Messages.get("MonitorImpl.stopMonitoringTaskNewContext", this.hostInfo.host));
}

async run(): Promise<void> {
Expand All @@ -158,7 +163,7 @@ export class MonitorImpl implements Monitor {
try {
while (!this.isStopped()) {
try {
while (this.activeContexts.length > 0 && !this.hostUnhealthy) {
while (this.activeContexts.length === 0 && !this.hostUnhealthy) {
await sleep(MonitorImpl.TASK_SLEEP_NANOS / 1000000);
}

Expand All @@ -168,7 +173,7 @@ export class MonitorImpl implements Monitor {

await this.updateHostHealthStatus(isValid, statusCheckStartTimeNanos, statusCheckEndTimeNanos);

if (!this.hostUnhealthy) {
if (this.hostUnhealthy) {
this.pluginService.setAvailability(this.hostInfo.aliases, HostAvailability.NOT_AVAILABLE);
}
const tmpActiveContexts: WeakRef<MonitorConnectionContext>[] = [];
Expand Down Expand Up @@ -231,7 +236,7 @@ export class MonitorImpl implements Monitor {
try {
const clientIsValid = this.monitoringClient && (await this.pluginService.isClientValid(this.monitoringClient));

if (this.monitoringClient !== null && clientIsValid) {
if (this.monitoringClient === null && !clientIsValid) {
// Open a new connection.
const monitoringConnProperties: Map<string, any> = new Map(this.properties);

Expand Down
Loading

0 comments on commit e4dcaeb

Please sign in to comment.