From 7ff81747226c2aa7bcdcef67a6f941106f43a5f4 Mon Sep 17 00:00:00 2001 From: bangbang93 Date: Wed, 3 Apr 2024 09:56:29 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=8B=AC=E7=AB=8Bkeepalive?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cluster.ts | 82 +++++------------------------------------- src/keepalive.ts | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 73 deletions(-) create mode 100644 src/keepalive.ts diff --git a/src/cluster.ts b/src/cluster.ts index 34334a5..dfedc03 100644 --- a/src/cluster.ts +++ b/src/cluster.ts @@ -1,5 +1,4 @@ import {decompress} from '@mongodb-js/zstd' -import Bluebird from 'bluebird' import {ChildProcess, spawn} from 'child_process' import {MultiBar} from 'cli-progress' import colors from 'colors/safe.js' @@ -12,24 +11,22 @@ import {createServer, Server} from 'http' import {createSecureServer} from 'http2' import http2Express from 'http2-express-bridge' import {Agent as HttpsAgent} from 'https' -import {clone, template, toString} from 'lodash-es' +import {template, toString} from 'lodash-es' import morgan from 'morgan' import ms from 'ms' import {constants} from 'node:http2' import {userInfo} from 'node:os' -import {clearTimeout} from 'node:timers' import {tmpdir} from 'os' import pMap from 'p-map' import pRetry from 'p-retry' -import pTimeout from 'p-timeout' import {basename, dirname, join} from 'path' -import prettyBytes from 'pretty-bytes' import {connect, Socket} from 'socket.io-client' import {Tail} from 'tail' import {fileURLToPath} from 'url' import {config, type OpenbmclapiAgentConfiguration, OpenbmclapiAgentConfigurationSchema} from './config.js' import {FileListSchema} from './constants.js' import {validateFile} from './file.js' +import {Keepalive} from './keepalive.js' import {logger} from './logger.js' import MeasureRouteFactory from './measure.route.js' import {getStorage, type IStorage} from './storage/base.storage.js' @@ -50,12 +47,10 @@ export class Cluster { public readonly counters: ICounters = {hits: 0, bytes: 0} public isEnabled = false public wantEnable = false - public keepAliveInterval?: NodeJS.Timeout public interval?: NodeJS.Timeout public nginxProcess?: ChildProcess public readonly storage: IStorage - private keepAliveError = 0 private readonly prefixUrl = process.env.CLUSTER_BMCLAPI ?? 'https://openbmclapi.bangbang93.com' private readonly host?: string private _port: number | string @@ -64,6 +59,7 @@ export class Cluster { private readonly got: Got private readonly requestCache = new Map() private readonly tmpDir = join(tmpdir(), 'openbmclapi') + private readonly keepalive = new Keepalive(ms('1m'), this) private socket?: Socket private server?: Server @@ -419,7 +415,7 @@ export class Cluster { } public async disable(): Promise { - clearTimeout(this.keepAliveInterval) + this.keepalive.stop() this.wantEnable = false return await new Promise((resolve, reject) => { this.socket?.emit('disable', null, ([err, ack]: [unknown, unknown]) => { @@ -445,28 +441,6 @@ export class Cluster { }) } - public async keepAlive(): Promise { - if (!this.isEnabled) { - throw new Error('节点未启用') - } - if (!this.socket) { - throw new Error('未连接到服务器') - } - - const counters = clone(this.counters) - const [err, date] = (await this.socket.emitWithAck('keep-alive', { - time: new Date(), - ...counters, - })) as [object, unknown] - - if (err) throw new Error('keep alive error', {cause: err}) - const bytes = prettyBytes(counters.bytes, {binary: true}) - logger.info(`keep alive success, serve ${counters.hits} files, ${bytes}`) - this.counters.hits -= counters.hits - this.counters.bytes -= counters.bytes - return !!date - } - public async requestCert(): Promise { const cert = await new Promise<{cert: string; key: string}>((resolve, reject) => { this.socket?.emit('request-cert', ([err, cert]: [unknown, {cert: string; key: string}]) => { @@ -489,8 +463,11 @@ export class Cluster { private async _enable(): Promise { let err: unknown let ack: unknown + if (!this.socket) { + throw new Error('未连接到服务器') + } try { - const res = (await this.socket?.timeout(ms('5m')).emitWithAck('enable', { + const res = (await this.socket.timeout(ms('5m')).emitWithAck('enable', { host: this.host, port: this.publicPort, version: this.version, @@ -515,48 +492,7 @@ export class Cluster { } logger.info(colors.rainbow('start doing my job')) - if (this.keepAliveInterval) { - clearTimeout(this.keepAliveInterval) - } - this.keepAliveInterval = setTimeout(() => { - void this._keepAlive() - }, ms('1m')) - } - - private async _keepAlive(): Promise { - logger.trace('start keep alive') - try { - const status = await pTimeout(this.keepAlive(), { - milliseconds: ms('10s'), - }) - if (!status) { - logger.fatal('kicked by server') - this.exit(1) - } - this.keepAliveError = 0 - } catch (e) { - this.keepAliveError++ - logger.error('keep alive error') - if (this.keepAliveError >= 3) { - await Bluebird.try(async () => { - await this.disable() - await this.connect() - await this.enable() - }) - .timeout(ms('10m'), 'restart timeout') - .catch((e) => { - logger.error(e, 'restart failed') - this.exit(1) - }) - } - } finally { - if (this.keepAliveInterval) { - clearTimeout(this.keepAliveInterval) - } - this.keepAliveInterval = setTimeout(() => { - void this._keepAlive() - }, ms('1m')) - } + this.keepalive.start(this.socket) } private onConnectionError(event: string, err: Error): void { diff --git a/src/keepalive.ts b/src/keepalive.ts new file mode 100644 index 0000000..1846764 --- /dev/null +++ b/src/keepalive.ts @@ -0,0 +1,93 @@ +import Bluebird from 'bluebird' +import {clone} from 'lodash-es' +import ms from 'ms' +import {clearTimeout} from 'node:timers' +import pTimeout from 'p-timeout' +import prettyBytes from 'pretty-bytes' +import {Socket} from 'socket.io-client' +import {Cluster} from './cluster.js' +import {logger} from './logger.js' + +export class Keepalive { + public timer?: NodeJS.Timeout + private socket?: Socket + private keepAliveError = 0 + + constructor( + private readonly interval: number, + private readonly cluster: Cluster, + ) {} + + public start(socket: Socket): void { + this.socket = socket + this.schedule() + } + + public stop(): void { + if (this.timer) { + clearTimeout(this.timer) + } + } + + private schedule(): void { + if (this.timer) { + clearTimeout(this.timer) + } + this.timer = setTimeout(() => { + logger.trace('start keep alive') + void this.emitKeepAlive() + }, this.interval) + } + + private async emitKeepAlive(): Promise { + try { + const status = await pTimeout(this.keepAlive(), { + milliseconds: ms('10s'), + }) + if (!status) { + logger.fatal('kicked by server') + this.cluster.exit(1) + } + this.keepAliveError = 0 + } catch (e) { + this.keepAliveError++ + logger.error(e, 'keep alive error') + if (this.keepAliveError >= 1) { + await Bluebird.try(async () => { + await this.cluster.disable() + await this.cluster.connect() + await this.cluster.enable() + }) + .timeout(ms('10m'), 'restart timeout') + .catch((e) => { + logger.error(e, 'restart failed') + this.cluster.exit(1) + }) + } + } finally { + void this.schedule() + } + } + + private async keepAlive(): Promise { + if (!this.cluster.isEnabled) { + throw new Error('节点未启用') + } + if (!this.socket) { + throw new Error('未连接到服务器') + } + + const counters = clone(this.cluster.counters) + const [err, date] = (await this.socket.emitWithAck('keep-alive', { + time: new Date(), + ...counters, + })) as [object, unknown] + + if (err) throw new Error('keep alive error', {cause: err}) + const bytes = prettyBytes(counters.bytes, {binary: true}) + logger.info(`keep alive success, serve ${counters.hits} files, ${bytes}`) + this.cluster.counters.hits -= counters.hits + this.cluster.counters.bytes -= counters.bytes + return !!date + } +}