Skip to content

Commit

Permalink
Merge pull request #76 from longzheng/fix-coordinator-cleanup
Browse files Browse the repository at this point in the history
Fix coordinator cleanup
  • Loading branch information
longzheng authored Jan 28, 2025
2 parents dc3352f + 1a26b0a commit 3a8b3b0
Show file tree
Hide file tree
Showing 44 changed files with 502 additions and 165 deletions.
17 changes: 13 additions & 4 deletions src/connections/modbus/connection/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class ModbusConnection {
private readonly mutex = new Mutex();

private state:
| { type: 'connected' }
| { type: 'connected'; abortController: AbortController }
| { type: 'connecting'; connectPromise: Promise<void> }
| { type: 'disconnected' } = { type: 'disconnected' };

Expand All @@ -30,7 +30,7 @@ export class ModbusConnection {
this.client.on('close', () => {
this.state = { type: 'disconnected' };

this.logger.error(`Modbus client closed`);
this.logger.info(`Modbus client closed`);
});

// This is never observed to be triggered
Expand All @@ -56,12 +56,17 @@ export class ModbusConnection {
try {
this.logger.info(`Modbus client connecting`);

const abortController = new AbortController();

switch (this.config.type) {
case 'tcp':
await this.client.connectTCP(this.config.ip, {
port: this.config.port,
// timeout for connection
timeout: connectionTimeoutMs,
socketOpts: {
signal: abortController.signal,
},
});
break;
case 'rtu':
Expand All @@ -79,7 +84,7 @@ export class ModbusConnection {

this.logger.info(`Modbus client connected`);

this.state = { type: 'connected' };
this.state = { type: 'connected', abortController };
} catch (error) {
this.logger.error(
error,
Expand Down Expand Up @@ -149,7 +154,11 @@ export class ModbusConnection {
});
}

close() {
destroy() {
if (this.state.type === 'connected') {
this.state.abortController.abort();
}

this.client.close(() => {});
}
}
2 changes: 1 addition & 1 deletion src/connections/modbus/connection/sma.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,6 @@ export class SmaConnection {
}

public onDestroy(): void {
this.modbusConnection.close();
this.modbusConnection.destroy();
}
}
16 changes: 12 additions & 4 deletions src/connections/powerwall2/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,25 @@ describe('Powerwall2Client', () => {
afterAll(() => mockServer.close());

it('can get meter aggregates', async () => {
const result = await power2Client.getMeterAggregates();
const result = await power2Client.getMeterAggregates({
signal: AbortSignal.timeout(1000),
});

expect(result.site.frequency).toBe(50);
});

it('can get state of energy', async () => {
const result = await power2Client.getSoe();
const result = await power2Client.getSoe({
signal: AbortSignal.timeout(1000),
});

expect(result.percentage).toBe(80.87499075192977);
});

it('can get meter site for three phase', async () => {
const result = await power2Client.getMetersSite();
const result = await power2Client.getMetersSite({
signal: AbortSignal.timeout(1000),
});

expect(result[0]?.Cached_readings.reactive_power_a).toBe(-770);
expect(result[0]?.Cached_readings.reactive_power_b).toBe(-440);
Expand All @@ -72,7 +78,9 @@ describe('Powerwall2Client', () => {
}),
);

const result = await power2Client.getMetersSite();
const result = await power2Client.getMetersSite({
signal: AbortSignal.timeout(1000),
});

expect(result[0]?.Cached_readings.reactive_power_a).toBe(-600);
expect(result[0]?.Cached_readings.reactive_power_b).toBe(undefined);
Expand Down
20 changes: 10 additions & 10 deletions src/connections/powerwall2/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Logger } from 'pino';
import { pinoLogger } from '../../helpers/logger.js';
import { type AxiosInstance } from 'axios';
import { type AxiosRequestConfig, type AxiosInstance } from 'axios';
import * as https from 'node:https';
import axios, { AxiosError } from 'axios';
import {
Expand Down Expand Up @@ -42,24 +42,24 @@ export class Powerwall2Client {
void this.getToken();
}

public async getMeterAggregates() {
const response = await this.get('/api/meters/aggregates');
public async getMeterAggregates({ signal }: { signal: AbortSignal }) {
const response = await this.get('/api/meters/aggregates', { signal });

const data = meterAggregatesSchema.parse(response);

return data;
}

public async getSoe() {
const response = await this.get('/api/system_status/soe');
public async getSoe({ signal }: { signal: AbortSignal }) {
const response = await this.get('/api/system_status/soe', { signal });

const data = systemStatusSoeSchema.parse(response);

return data;
}

public async getMetersSite() {
const response = await this.get('/api/meters/site');
public async getMetersSite({ signal }: { signal: AbortSignal }) {
const response = await this.get('/api/meters/site', { signal });

const data = metersSiteSchema.parse(response);

Expand Down Expand Up @@ -107,11 +107,11 @@ export class Powerwall2Client {

private async get(
url: string,
params?: Record<string, string>,
options?: Omit<AxiosRequestConfig<never>, 'headers'>,
): Promise<unknown> {
try {
const response = await this.axiosInstance.get<string>(url, {
params,
...options,
headers: {
Cookie: `AuthCookie=${await this.getToken()}`,
},
Expand All @@ -130,7 +130,7 @@ export class Powerwall2Client {
this.token = { type: 'none' };
await this.getToken();

return this.get(url, params);
return this.get(url, options);
}

throw error;
Expand Down
2 changes: 1 addition & 1 deletion src/connections/sunspec/connection/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ export abstract class SunSpecConnection {
}

public onDestroy(): void {
this.modbusConnection.close();
this.modbusConnection.destroy();
}
}
42 changes: 33 additions & 9 deletions src/coordinator/helpers/inverterController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ export class InverterController {
// we take a time weighted average of the last few seconds to smooth out the control values
private secondsToSample: number;
private intervalSeconds: number;
private controlLimitsLoopTimer: NodeJS.Timeout | null = null;
private applyControlLoopTimer: NodeJS.Timeout | null = null;
private abortController: AbortController;

constructor({
config,
Expand All @@ -112,6 +115,7 @@ export class InverterController {
this.intervalSeconds = config.inverterControl.intervalSeconds;
this.limiters = limiters;
this.logger = pinoLogger.child({ module: 'InverterController' });
this.abortController = new AbortController();
this.onControl = onControl;

this.updateControlLimitsLoop();
Expand Down Expand Up @@ -196,7 +200,7 @@ export class InverterController {
// update at most every 1 second
const delay = Math.max(1000 - duration, 0);

setTimeout(() => {
this.controlLimitsLoopTimer = setTimeout(() => {
void this.updateControlLimitsLoop();
}, delay);
}
Expand Down Expand Up @@ -226,18 +230,26 @@ export class InverterController {
} catch (error) {
this.logger.error(error, 'Failed to set inverter control values');
} finally {
const end = performance.now();
const duration = end - start;
if (!this.abortController.signal.aborted) {
const end = performance.now();
const duration = end - start;

this.logger.trace({ duration }, 'Inverter control loop duration');
this.logger.trace(
{ duration },
'Inverter control loop duration',
);

writeLatency({ field: 'applyControlLoop', duration });
writeLatency({ field: 'applyControlLoop', duration });

const delay = Math.max(this.intervalSeconds * 1000 - duration, 0);
const delay = Math.max(
this.intervalSeconds * 1000 - duration,
0,
);

setTimeout(() => {
void this.applyControlLoop();
}, delay);
this.applyControlLoopTimer = setTimeout(() => {
void this.applyControlLoop();
}, delay);
}
}
}

Expand Down Expand Up @@ -361,6 +373,18 @@ export class InverterController {

return rampedInverterConfiguration;
}

public destroy() {
this.abortController.abort();

if (this.controlLimitsLoopTimer) {
clearTimeout(this.controlLimitsLoopTimer);
}

if (this.applyControlLoopTimer) {
clearTimeout(this.applyControlLoopTimer);
}
}
}

export function calculateInverterConfiguration({
Expand Down
7 changes: 5 additions & 2 deletions src/coordinator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getSep2Instance } from '../sep2/index.js';
import { getSiteSamplePollerInstance } from './helpers/siteSample.js';
import { type SiteSamplePollerBase } from '../meters/siteSamplePollerBase.js';
import { InvertersPoller } from './helpers/inverterSample.js';
import { type Limiters } from '../limiters/index.js';
import { destroyLimiters, type Limiters } from '../limiters/index.js';
import { getLimiters } from '../limiters/index.js';

const logger = pinoLogger.child({ module: 'coordinator' });
Expand Down Expand Up @@ -79,8 +79,11 @@ export function createCoordinator(): Coordinator {
limiters,
destroy: () => {
logger.info('Destroying coordinator');

sep2Instance?.destroy();
siteSamplePoller.destroy();
invertersPoller.destroy();
inverterController.destroy();
destroyLimiters(limiters);
},
};
}
12 changes: 12 additions & 0 deletions src/helpers/withAbortCheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export async function withAbortCheck<T>({
signal,
fn,
}: {
signal: AbortSignal;
fn: () => Promise<T>;
}): Promise<T> {
if (signal.aborted) {
throw new Error('Operation was aborted');
}
return await fn();
}
16 changes: 16 additions & 0 deletions src/helpers/withRetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,20 @@ describe('withRetry', () => {
expect(fn).toHaveBeenCalledTimes(2);
expect(pinoLogger.warn).toHaveBeenCalledTimes(1);
});

it('should throw error when operation is aborted', async () => {
const abortController = new AbortController();
const mockFn = vi.fn().mockRejectedValue(new Error('Test error'));

const retryPromise = withRetry(mockFn, {
attempts: 3,
functionName: 'mockFn',
delayMilliseconds: 10,
abortController,
});

abortController.abort();

await expect(retryPromise).rejects.toThrow('Operation was aborted');
});
});
10 changes: 10 additions & 0 deletions src/helpers/withRetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@ export async function withRetry<T>(
attempts,
functionName,
delayMilliseconds,
abortController,
}: {
attempts: number;
functionName: string;
delayMilliseconds?: number;
abortController?: AbortController;
},
): Promise<T> {
for (let attempt = 1; attempt <= attempts; attempt++) {
try {
const result = await fn();

if (abortController?.signal.aborted) {
throw new Error('Operation was aborted');
}

return result;
} catch (error) {
if (abortController?.signal.aborted) {
throw new Error('Operation was aborted');
}

pinoLogger.warn(
error,
`${functionName} withRetry attempt ${attempt} of ${attempts} failed`,
Expand Down
9 changes: 9 additions & 0 deletions src/inverter/inverterDataPollerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export abstract class InverterDataPollerBase extends EventEmitter<{
protected applyControl: boolean;
protected readonly inverterIndex: number;
private inverterPollerName: string;
protected readonly abortController: AbortController;

constructor({
name,
Expand All @@ -41,9 +42,12 @@ export abstract class InverterDataPollerBase extends EventEmitter<{
this.applyControl = applyControl;
this.inverterIndex = inverterIndex;
this.inverterPollerName = name;
this.abortController = new AbortController();
}

public destroy() {
this.abortController.abort();

if (this.pollingTimer) {
clearTimeout(this.pollingTimer);
}
Expand Down Expand Up @@ -71,9 +75,14 @@ export abstract class InverterDataPollerBase extends EventEmitter<{
attempts: 3,
functionName: 'getInverterData',
delayMilliseconds: 100,
abortController: this.abortController,
}),
);

if (this.abortController.signal.aborted) {
return;
}

this.inverterDataCache = inverterData;

const end = performance.now();
Expand Down
Loading

0 comments on commit 3a8b3b0

Please sign in to comment.