Skip to content

Commit

Permalink
Fixes and debug logs for worker processses
Browse files Browse the repository at this point in the history
  • Loading branch information
jairajdev committed Aug 19, 2024
1 parent 88652bf commit d9a31f8
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 35 deletions.
23 changes: 23 additions & 0 deletions src/API.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import { getGlobalNetworkAccount } from './GlobalAccount'
import { cycleRecordWithShutDownMode } from './Data/Cycles'
import { isDebugMiddleware } from './DebugMode'
import { Utils as StringUtils } from '@shardus/types'
import {
receivedReceiptCount,
verifiedReceiptCount,
successReceiptCount,
failureReceiptCount,
} from './primary-process'
const { version } = require('../package.json') // eslint-disable-line @typescript-eslint/no-var-requires

const TXID_LENGTH = 64
Expand Down Expand Up @@ -1080,6 +1086,23 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
}
)

server.get(
'/verified-receipt-counter',
{
preHandler: async (_request, reply) => {
isDebugMiddleware(_request, reply)
},
},
(_request, reply) => {
reply.send({
receivedReceiptCount,
verifiedReceiptCount,
successReceiptCount,
failureReceiptCount,
})
}
)

// Old snapshot ArchivedCycle endpoint;
if (!config.experimentalSnapshot) {
type FullArchiveRequest = FastifyRequest<{
Expand Down
8 changes: 8 additions & 0 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ export interface Config {
configChangeMaxCyclesToKeep: number
// the number of config changes to keep*/
configChangeMaxChangesToKeep: number
receiptLoadTrakerInterval: number // Interval to track the receipt load
receiptLoadTrakerLimit: number // Limit to track the receipt load
lastActivityCheckInterval: number // Interval to check last activity
lastActivityCheckTimeout: number // Timeout to check last activity
}

let config: Config = {
Expand Down Expand Up @@ -156,6 +160,10 @@ let config: Config = {
maxCyclesShardDataToKeep: 10,
configChangeMaxCyclesToKeep: 5,
configChangeMaxChangesToKeep: 1000,
receiptLoadTrakerInterval: 15 * 1000,
receiptLoadTrakerLimit: 10,
lastActivityCheckInterval: 15 * 1000,
lastActivityCheckTimeout: 30 * 1000,
}
// Override default config params from config file, env vars, and cli args
export async function overrideDefaultConfig(file: string): Promise<void> {
Expand Down
57 changes: 26 additions & 31 deletions src/primary-process/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Cluster, Worker } from 'cluster'
import { cpus } from 'os'
import { ArchiverReceipt } from '../dbstore/receipts'
import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collector'
import { config } from '../Config'
import { EventEmitter } from 'events'

const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process
Expand All @@ -18,29 +19,25 @@ export interface ChildMessageInterface {
}
}

let receiptCount = 0 // Variable to keep track of the number of receipts received
let verifiedCount = 0 // Variable to keep track of the number of receipts verified
let successCount = 0 // Variable to keep track of the number of receipts successful verification
let failureCount = 0 // Variable to keep track of the number of receipts failed verification
export let receivedReceiptCount = 0 // Variable to keep track of the number of receipts received
export let verifiedReceiptCount = 0 // Variable to keep track of the number of receipts verified
export let successReceiptCount = 0 // Variable to keep track of the number of receipts successful verification
export let failureReceiptCount = 0 // Variable to keep track of the number of receipts failed verification

let receiptLoadTraker = 0 // Variable to keep track of the receipt load within the last receiptLoadTrakerInterval
const receiptLoadTrakerInterval: number = 15 * 1000 // Interval to track the receipt load
const receiptLoadTrakerLimit = 10 // Limit to track the receipt load
// Creating a worker pool
const workers: Worker[] = []
const extraWorkers = new Map<number, Worker>()
let currentWorker = 0

const receiptsInVerificationMap = new Map<string, number>()

const emitter = new EventEmitter()

const setupWorkerProcesses = (cluster: Cluster): void => {
export const setupWorkerProcesses = (cluster: Cluster): void => {
console.log(`Master ${process.pid} is running`)
// Set interval to check receipt count every 15 seconds
setInterval(() => {
if (receiptLoadTraker < receiptLoadTrakerLimit) {
console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${receiptLoadTrakerLimit}`)
if (receiptLoadTraker < config.receiptLoadTrakerLimit) {
console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${config.receiptLoadTrakerLimit}`)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
// Kill the extra workers from the end of the array
for (let i = workers.length - 1; i >= 0; i--) {
// console.log(`Killing worker ${workers[i].process.pid} with index ${i}`);
Expand All @@ -53,7 +50,7 @@ const setupWorkerProcesses = (cluster: Cluster): void => {
}
return
}
let neededWorkers = Math.ceil(receiptLoadTraker / receiptLoadTrakerLimit)
let neededWorkers = Math.ceil(receiptLoadTraker / config.receiptLoadTrakerLimit)
if (neededWorkers > MAX_WORKERS) neededWorkers = MAX_WORKERS
const currentWorkers = workers.length
console.log(`Needed workers: ${neededWorkers}`, `Current workers: ${currentWorkers}`)
Expand All @@ -80,7 +77,7 @@ const setupWorkerProcesses = (cluster: Cluster): void => {
`Adjusted worker count to ${workers.length}, based on ${receiptLoadTraker} receipts received.`
)
receiptLoadTraker = 0 // Reset the count
}, receiptLoadTrakerInterval)
}, config.receiptLoadTrakerInterval)

Check failure

Code scanning / CodeQL

Resource exhaustion High

This creates a timer with a user-controlled duration from a
user-provided value
.
}

const setupWorkerListeners = (worker: Worker): void => {
Expand All @@ -91,13 +88,13 @@ const setupWorkerListeners = (worker: Worker): void => {
case 'receipt-verification': {
// const result = results.get(workerId)
// if (result) {
// verifiedCount++
// verifiedReceiptCount++
// if (data.success) {
// result.success++
// successCount++
// successReceiptCount++
// } else {
// result.failure++
// failureCount++
// failureReceiptCount++
// }
// }
const { txId, timestamp } = data
Expand Down Expand Up @@ -184,18 +181,11 @@ const forwardReceiptVerificationResult = (
// }

export const offloadReceipt = async (receipt: ArchiverReceipt): Promise<ReceiptVerificationResult> => {
receiptCount++ // Increment the counter for each receipt received
receivedReceiptCount++ // Increment the counter for each receipt received
receiptLoadTraker++ // Increment the receipt load tracker
let verificationResult: ReceiptVerificationResult
if (workers.length === 0) {
verificationResult = await verifyArchiverReceipt(receipt)
verifiedCount++
if (verificationResult.success) {
successCount++
} else {
failureCount++
}
return verificationResult
} else {
// Forward the request to a worker in a round-robin fashion
let worker = workers[currentWorker]
Expand Down Expand Up @@ -224,13 +214,18 @@ export const offloadReceipt = async (receipt: ArchiverReceipt): Promise<ReceiptV
type: 'receipt-verification',
data: { receipt },
})
verificationResult = await forwardReceiptVerificationResult(
receipt.tx.txId,
receipt.tx.timestamp,
worker
)
}
verifiedCount++
if (verificationResult.success) {
successCount++
} else {
failureCount++
}
return verificationResult
}
verifiedReceiptCount++
if (verificationResult.success) {
successReceiptCount++
} else {
failureReceiptCount++
}
return verificationResult
}
4 changes: 4 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import { loadGlobalAccounts, syncGlobalAccount } from './GlobalAccount'
import { setShutdownCycleRecord, cycleRecordWithShutDownMode } from './Data/Cycles'
import { registerRoutes } from './API'
import { Utils as StringUtils } from '@shardus/types'
import { setupWorkerProcesses } from './primary-process'
import { initWorkerProcess } from './worker-process'

const configFile = join(process.cwd(), 'archiver-config.json')
let logDir: string
Expand Down Expand Up @@ -75,6 +77,7 @@ async function start(): Promise<void> {
if (!cluster.isPrimary) {
// Initialize state from config
await State.initFromConfig(config)
await initWorkerProcess()
return
}

Expand Down Expand Up @@ -478,6 +481,7 @@ async function startServer(): Promise<void> {
Logger.mainLogger.debug('Archive-server has started.')
State.setActive()
Collector.scheduleMissingTxsDataQuery()
setupWorkerProcesses(cluster)
}
)
}
Expand Down
10 changes: 6 additions & 4 deletions src/worker-process/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collector'
import { ChildMessageInterface } from '../primary-process'
import { config } from '../Config'

export const initWorkerProcess = async (): Promise<void> => {
console.log(`Worker ${process.pid} started`)
let lastActivity = Date.now()
const lastActivityCheckInterval: number = 15 * 1000 // Interval to check last activity
const lastActivityCheckTimeout: number = 30 * 1000 // Timeout to check last activity

// Worker processes
process.on('message', async ({ type, data }: ChildMessageInterface) => {
Expand Down Expand Up @@ -42,11 +41,14 @@ export const initWorkerProcess = async (): Promise<void> => {
lastActivity = Date.now()
})
setInterval(() => {
if (Date.now() - lastActivity > lastActivityCheckTimeout) {
console.log(
`lastActivityCheckTimeout: ${config.lastActivityCheckTimeout}, lastActivityCheckInterval: ${config.lastActivityCheckInterval}`

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
)
if (Date.now() - lastActivity > config.lastActivityCheckTimeout) {
console.log(`Worker ${process.pid} is idle for more than 1 minute`)
process.send({ type: 'clild_close' })
}
}, lastActivityCheckInterval)
}, config.lastActivityCheckInterval)

Check failure

Code scanning / CodeQL

Resource exhaustion High

This creates a timer with a user-controlled duration from a
user-provided value
.
}

process.on('uncaughtException', (error) => {
Expand Down

0 comments on commit d9a31f8

Please sign in to comment.