Skip to content

Commit

Permalink
Fix: Monika Multi Node (#1233)
Browse files Browse the repository at this point in the history
* refactor: remove send online status

* refactor: remove unused hasConnectionToSymon variable

* refactor: extract fetchAndApplyProbeChanges function

* feat: add probe assignment changes checker

* fix: add total probe assignment to check changes
  • Loading branch information
haricnugraha authored Feb 1, 2024
1 parent 3dc57cb commit 040c741
Showing 1 changed file with 86 additions and 54 deletions.
140 changes: 86 additions & 54 deletions src/symon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type ProbeChange = {
lastEvent: LastEvent
}

type ProbeAssignmentTotal = { total: number; updatedAt?: Date }

const getHandshakeData = async (): Promise<SymonHandshakeData> => {
await retry(handleAll, {
backoff: new ExponentialBackoff(),
Expand Down Expand Up @@ -153,10 +155,15 @@ export default class SymonClient {
private worker
private apiKey: string
private probeChangesInterval: NodeJS.Timeout | undefined
private hasConnectionToSymon: boolean = false
private probeAssignmentChangesInterval: NodeJS.Timeout | undefined
private httpClient: AxiosInstance
private locationId: string
private monikaId: string
private isMultiNode: boolean
private probeAssignmentTotal: ProbeAssignmentTotal = {
total: 0,
}

private probeChangesCheckedAt: Date | undefined
private reportProbesLimit: number
private reportTimeout: NodeJS.Timeout | undefined
Expand All @@ -182,6 +189,7 @@ export default class SymonClient {
})
this.locationId = symonLocationId
this.monikaId = symonMonikaId
this.isMultiNode = apiVersion === SYMON_API_VERSION.v2
this.reportProbesInterval = symonReportInterval
this.reportProbesLimit = symonReportLimit
this.worker = new Piscina.Piscina({
Expand All @@ -197,56 +205,24 @@ export default class SymonClient {
this.monikaId = await this.handshake()
log.info('[Symon] Handshake')

this.sendStatus({ isOnline: true })
.then(() => {
log.info('[Symon] Send status succeed')
})
.catch((error) => {
log.error(`[Symon] Send status failed. ${(error as Error).message}`)
})

const probeChangesCheckedAt = new Date()

await this.fetchProbesAndUpdateConfig()
this.setProbeChangesCheckedAt(probeChangesCheckedAt)

this.probeChangesInterval = setInterval(async () => {
const probeChangesCheckedAt = new Date()
try {
const probeChanges = await this.probeChanges.bind(this)()
this.setProbeChangesCheckedAt(probeChangesCheckedAt)

const hasProbeChanges = probeChanges.length > 0
if (!hasProbeChanges) {
log.info(
`[Symon] No probe changes since ${this.probeChangesCheckedAt}`
)
return
}

const probeChangesApplyResults = await applyProbeChanges(probeChanges)
for (const result of probeChangesApplyResults) {
if (result.status === 'rejected') {
log.error(
`[Symon] Get probe changes since ${this.probeChangesCheckedAt}. ${result.reason}`
)
}
}
this.setProbeChangesCheckedAt(probeChangesCheckedAt)
this.probeChangesInterval = setInterval(
this.fetchAndApplyProbeChanges.bind(this),
getContext().flags.symonGetProbesIntervalMs
)

log.info(
`[Symon] Get probe changes (${probeChanges.length}) since ${this.probeChangesCheckedAt}`
)
} catch (error) {
log.error(
`[Symon] Get probe changes since ${
this.probeChangesCheckedAt
} failed. ${(error as Error).message}`
)
}
}, getContext().flags.symonGetProbesIntervalMs)
if (this.isMultiNode) {
this.probeAssignmentChangesInterval = setInterval(
this.fetchAndApplyProbeAssignmentChanges.bind(this),
getContext().flags.symonGetProbesIntervalMs
)
}

this.report().catch((error) => {
this.hasConnectionToSymon = false
log.error(`[Symon] Report failed. ${(error as Error).message}`)
})

Expand All @@ -257,18 +233,14 @@ export default class SymonClient {
}

async sendStatus({ isOnline }: { isOnline: boolean }): Promise<void> {
const { status } = await this.httpClient({
await this.httpClient({
data: {
monikaId: this.monikaId,
status: isOnline,
},
method: 'POST',
url: '/status',
})

if (status === 200) {
this.hasConnectionToSymon = true
}
}

async stop(): Promise<void> {
Expand All @@ -277,6 +249,7 @@ export default class SymonClient {
}

clearInterval(this.probeChangesInterval)
clearInterval(this.probeAssignmentChangesInterval)
clearTimeout(this.reportTimeout)

await this.worker.destroy()
Expand All @@ -286,6 +259,40 @@ export default class SymonClient {
this.probeChangesCheckedAt = probeChangesCheckedAt
}

private async fetchAndApplyProbeChanges() {
const probeChangesCheckedAt = new Date()

try {
const probeChanges = await this.probeChanges()
this.setProbeChangesCheckedAt(probeChangesCheckedAt)

const hasProbeChanges = probeChanges.length > 0
if (!hasProbeChanges) {
log.info(`[Symon] No probe changes since ${this.probeChangesCheckedAt}`)
return
}

const probeChangesApplyResults = await applyProbeChanges(probeChanges)
for (const result of probeChangesApplyResults) {
if (result.status === 'rejected') {
log.error(
`[Symon] Get probe changes since ${this.probeChangesCheckedAt}. ${result.reason}`
)
}
}

log.info(
`[Symon] Get probe changes (${probeChanges.length}) since ${this.probeChangesCheckedAt}`
)
} catch (error) {
log.error(
`[Symon] Get probe changes since ${
this.probeChangesCheckedAt
} failed. ${(error as Error).message}`
)
}
}

private willSendEventListener({
probeState,
validation,
Expand Down Expand Up @@ -328,7 +335,6 @@ export default class SymonClient {
// Create a task data object
const taskData = {
apiKey: this.apiKey,
hasConnectionToSymon: this.hasConnectionToSymon,
httpClient: this.httpClient,
monikaId: this.monikaId,
probeIds: getProbes().map(({ id }) => id),
Expand All @@ -345,7 +351,6 @@ export default class SymonClient {
this.report
.bind(this)()
.catch((error) => {
this.hasConnectionToSymon = false
log.error(`[Symon] Report failed. ${(error as Error).message}`)
})
}, this.reportProbesInterval)
Expand Down Expand Up @@ -407,10 +412,8 @@ export default class SymonClient {
// Fetch the probes
const { hash, probes } = await this.fetchProbes()
const newConfig: Config = { probes, version: hash }
await setConfig(newConfig)

// Set connection to symon as true, because it could fetch the probes
this.hasConnectionToSymon = true
await setConfig(newConfig)
log.info('[Symon] Get probes succeed')
}

Expand Down Expand Up @@ -439,6 +442,35 @@ export default class SymonClient {
.post('/client-handshake', handshakeData)
.then((res) => res.data?.data.monikaId)
}

private async fetchAndApplyProbeAssignmentChanges(): Promise<void> {
const probeAssignmentTotal = await this.fetchProbeAssignmentTotal()
const { total, updatedAt } = probeAssignmentTotal

if (
total !== this.probeAssignmentTotal.total ||
updatedAt !== this.probeAssignmentTotal.updatedAt
) {
await this.fetchProbesAndUpdateConfig()
this.setProbeAssignmentTotal(probeAssignmentTotal)
log.info('[Symon] The probe assignment has been updated')
}
}

private setProbeAssignmentTotal(probeAssignmentTotal: ProbeAssignmentTotal) {
this.probeAssignmentTotal = probeAssignmentTotal
}

private async fetchProbeAssignmentTotal(): Promise<{
total: number
updatedAt: Date
}> {
const response = await this.httpClient.get<{
data: { total: number; updatedAt: Date }
}>(`/${this.monikaId}/probe-assignments/total`)

return response.data.data
}
}

async function applyProbeChanges(probeChanges: ProbeChange[]) {
Expand Down

0 comments on commit 040c741

Please sign in to comment.