Skip to content

Commit

Permalink
refactor: 独立keepalive实现
Browse files Browse the repository at this point in the history
  • Loading branch information
bangbang93 committed Apr 3, 2024
1 parent aff481c commit 7ff8174
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 73 deletions.
82 changes: 9 additions & 73 deletions src/cluster.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {decompress} from '@mongodb-js/zstd'
import Bluebird from 'bluebird'
import {ChildProcess, spawn} from 'child_process'
import {MultiBar} from 'cli-progress'
import colors from 'colors/safe.js'
Expand All @@ -12,24 +11,22 @@ import {createServer, Server} from 'http'
import {createSecureServer} from 'http2'
import http2Express from 'http2-express-bridge'
import {Agent as HttpsAgent} from 'https'
import {clone, template, toString} from 'lodash-es'
import {template, toString} from 'lodash-es'
import morgan from 'morgan'
import ms from 'ms'
import {constants} from 'node:http2'
import {userInfo} from 'node:os'
import {clearTimeout} from 'node:timers'
import {tmpdir} from 'os'
import pMap from 'p-map'
import pRetry from 'p-retry'
import pTimeout from 'p-timeout'
import {basename, dirname, join} from 'path'
import prettyBytes from 'pretty-bytes'
import {connect, Socket} from 'socket.io-client'
import {Tail} from 'tail'
import {fileURLToPath} from 'url'
import {config, type OpenbmclapiAgentConfiguration, OpenbmclapiAgentConfigurationSchema} from './config.js'
import {FileListSchema} from './constants.js'
import {validateFile} from './file.js'
import {Keepalive} from './keepalive.js'
import {logger} from './logger.js'
import MeasureRouteFactory from './measure.route.js'
import {getStorage, type IStorage} from './storage/base.storage.js'
Expand All @@ -50,12 +47,10 @@ export class Cluster {
public readonly counters: ICounters = {hits: 0, bytes: 0}
public isEnabled = false
public wantEnable = false
public keepAliveInterval?: NodeJS.Timeout
public interval?: NodeJS.Timeout
public nginxProcess?: ChildProcess
public readonly storage: IStorage

private keepAliveError = 0
private readonly prefixUrl = process.env.CLUSTER_BMCLAPI ?? 'https://openbmclapi.bangbang93.com'
private readonly host?: string
private _port: number | string
Expand All @@ -64,6 +59,7 @@ export class Cluster {
private readonly got: Got
private readonly requestCache = new Map()
private readonly tmpDir = join(tmpdir(), 'openbmclapi')
private readonly keepalive = new Keepalive(ms('1m'), this)
private socket?: Socket

private server?: Server
Expand Down Expand Up @@ -419,7 +415,7 @@ export class Cluster {
}

public async disable(): Promise<void> {
clearTimeout(this.keepAliveInterval)
this.keepalive.stop()
this.wantEnable = false
return await new Promise((resolve, reject) => {
this.socket?.emit('disable', null, ([err, ack]: [unknown, unknown]) => {
Expand All @@ -445,28 +441,6 @@ export class Cluster {
})
}

public async keepAlive(): Promise<boolean> {
if (!this.isEnabled) {
throw new Error('节点未启用')
}
if (!this.socket) {
throw new Error('未连接到服务器')
}

const counters = clone(this.counters)
const [err, date] = (await this.socket.emitWithAck('keep-alive', {
time: new Date(),
...counters,
})) as [object, unknown]

if (err) throw new Error('keep alive error', {cause: err})
const bytes = prettyBytes(counters.bytes, {binary: true})
logger.info(`keep alive success, serve ${counters.hits} files, ${bytes}`)
this.counters.hits -= counters.hits
this.counters.bytes -= counters.bytes
return !!date
}

public async requestCert(): Promise<void> {
const cert = await new Promise<{cert: string; key: string}>((resolve, reject) => {
this.socket?.emit('request-cert', ([err, cert]: [unknown, {cert: string; key: string}]) => {
Expand All @@ -489,8 +463,11 @@ export class Cluster {
private async _enable(): Promise<void> {
let err: unknown
let ack: unknown
if (!this.socket) {
throw new Error('未连接到服务器')
}
try {
const res = (await this.socket?.timeout(ms('5m')).emitWithAck('enable', {
const res = (await this.socket.timeout(ms('5m')).emitWithAck('enable', {
host: this.host,
port: this.publicPort,
version: this.version,
Expand All @@ -515,48 +492,7 @@ export class Cluster {
}

logger.info(colors.rainbow('start doing my job'))
if (this.keepAliveInterval) {
clearTimeout(this.keepAliveInterval)
}
this.keepAliveInterval = setTimeout(() => {
void this._keepAlive()
}, ms('1m'))
}

private async _keepAlive(): Promise<void> {
logger.trace('start keep alive')
try {
const status = await pTimeout(this.keepAlive(), {
milliseconds: ms('10s'),
})
if (!status) {
logger.fatal('kicked by server')
this.exit(1)
}
this.keepAliveError = 0
} catch (e) {
this.keepAliveError++
logger.error('keep alive error')
if (this.keepAliveError >= 3) {
await Bluebird.try(async () => {
await this.disable()
await this.connect()
await this.enable()
})
.timeout(ms('10m'), 'restart timeout')
.catch((e) => {
logger.error(e, 'restart failed')
this.exit(1)
})
}
} finally {
if (this.keepAliveInterval) {
clearTimeout(this.keepAliveInterval)
}
this.keepAliveInterval = setTimeout(() => {
void this._keepAlive()
}, ms('1m'))
}
this.keepalive.start(this.socket)
}

private onConnectionError(event: string, err: Error): void {
Expand Down
93 changes: 93 additions & 0 deletions src/keepalive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import Bluebird from 'bluebird'
import {clone} from 'lodash-es'
import ms from 'ms'
import {clearTimeout} from 'node:timers'
import pTimeout from 'p-timeout'
import prettyBytes from 'pretty-bytes'
import {Socket} from 'socket.io-client'
import {Cluster} from './cluster.js'
import {logger} from './logger.js'

export class Keepalive {
public timer?: NodeJS.Timeout
private socket?: Socket
private keepAliveError = 0

constructor(
private readonly interval: number,
private readonly cluster: Cluster,
) {}

public start(socket: Socket): void {
this.socket = socket
this.schedule()
}

public stop(): void {
if (this.timer) {
clearTimeout(this.timer)
}
}

private schedule(): void {
if (this.timer) {
clearTimeout(this.timer)
}
this.timer = setTimeout(() => {
logger.trace('start keep alive')
void this.emitKeepAlive()
}, this.interval)
}

private async emitKeepAlive(): Promise<void> {
try {
const status = await pTimeout(this.keepAlive(), {
milliseconds: ms('10s'),
})
if (!status) {
logger.fatal('kicked by server')
this.cluster.exit(1)
}
this.keepAliveError = 0
} catch (e) {
this.keepAliveError++
logger.error(e, 'keep alive error')
if (this.keepAliveError >= 1) {
await Bluebird.try(async () => {
await this.cluster.disable()
await this.cluster.connect()
await this.cluster.enable()
})
.timeout(ms('10m'), 'restart timeout')
.catch((e) => {
logger.error(e, 'restart failed')
this.cluster.exit(1)
})
}
} finally {
void this.schedule()
}
}

private async keepAlive(): Promise<boolean> {
if (!this.cluster.isEnabled) {
throw new Error('节点未启用')
}
if (!this.socket) {
throw new Error('未连接到服务器')
}

const counters = clone(this.cluster.counters)
const [err, date] = (await this.socket.emitWithAck('keep-alive', {
time: new Date(),
...counters,
})) as [object, unknown]

if (err) throw new Error('keep alive error', {cause: err})
const bytes = prettyBytes(counters.bytes, {binary: true})
logger.info(`keep alive success, serve ${counters.hits} files, ${bytes}`)
this.cluster.counters.hits -= counters.hits
this.cluster.counters.bytes -= counters.bytes
return !!date
}
}

0 comments on commit 7ff8174

Please sign in to comment.