diff --git a/src/API.ts b/src/API.ts index 5335754c..fcef3179 100644 --- a/src/API.ts +++ b/src/API.ts @@ -28,6 +28,12 @@ import { getGlobalNetworkAccount } from './GlobalAccount' import { cycleRecordWithShutDownMode } from './Data/Cycles' import { isDebugMiddleware } from './DebugMode' import { Utils as StringUtils } from '@shardus/types' +import { + receivedReceiptCount, + verifiedReceiptCount, + successReceiptCount, + failureReceiptCount, +} from './primary-process' const { version } = require('../package.json') // eslint-disable-line @typescript-eslint/no-var-requires const TXID_LENGTH = 64 @@ -1080,6 +1086,23 @@ export function registerRoutes(server: FastifyInstance { + isDebugMiddleware(_request, reply) + }, + }, + (_request, reply) => { + reply.send({ + receivedReceiptCount, + verifiedReceiptCount, + successReceiptCount, + failureReceiptCount, + }) + } + ) + // Old snapshot ArchivedCycle endpoint; if (!config.experimentalSnapshot) { type FullArchiveRequest = FastifyRequest<{ diff --git a/src/Config.ts b/src/Config.ts index 7ee6bb6a..4be93edc 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -82,6 +82,10 @@ export interface Config { configChangeMaxCyclesToKeep: number // the number of config changes to keep*/ configChangeMaxChangesToKeep: number + receiptLoadTrakerInterval: number // Interval to track the receipt load + receiptLoadTrakerLimit: number // Limit to track the receipt load + lastActivityCheckInterval: number // Interval to check last activity + lastActivityCheckTimeout: number // Timeout to check last activity } let config: Config = { @@ -156,6 +160,10 @@ let config: Config = { maxCyclesShardDataToKeep: 10, configChangeMaxCyclesToKeep: 5, configChangeMaxChangesToKeep: 1000, + receiptLoadTrakerInterval: 15 * 1000, + receiptLoadTrakerLimit: 10, + lastActivityCheckInterval: 15 * 1000, + lastActivityCheckTimeout: 30 * 1000, } // Override default config params from config file, env vars, and cli args export async function overrideDefaultConfig(file: string): Promise { diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index 91885345..e07071a4 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -2,6 +2,7 @@ import type { Cluster, Worker } from 'cluster' import { cpus } from 'os' import { ArchiverReceipt } from '../dbstore/receipts' import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collector' +import { config } from '../Config' import { EventEmitter } from 'events' const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process @@ -18,29 +19,25 @@ export interface ChildMessageInterface { } } -let receiptCount = 0 // Variable to keep track of the number of receipts received -let verifiedCount = 0 // Variable to keep track of the number of receipts verified -let successCount = 0 // Variable to keep track of the number of receipts successful verification -let failureCount = 0 // Variable to keep track of the number of receipts failed verification +export let receivedReceiptCount = 0 // Variable to keep track of the number of receipts received +export let verifiedReceiptCount = 0 // Variable to keep track of the number of receipts verified +export let successReceiptCount = 0 // Variable to keep track of the number of receipts successful verification +export let failureReceiptCount = 0 // Variable to keep track of the number of receipts failed verification let receiptLoadTraker = 0 // Variable to keep track of the receipt load within the last receiptLoadTrakerInterval -const receiptLoadTrakerInterval: number = 15 * 1000 // Interval to track the receipt load -const receiptLoadTrakerLimit = 10 // Limit to track the receipt load // Creating a worker pool const workers: Worker[] = [] const extraWorkers = new Map() let currentWorker = 0 -const receiptsInVerificationMap = new Map() - const emitter = new EventEmitter() -const setupWorkerProcesses = (cluster: Cluster): void => { +export const setupWorkerProcesses = (cluster: Cluster): void => { console.log(`Master ${process.pid} is running`) // Set interval to check receipt count every 15 seconds setInterval(() => { - if (receiptLoadTraker < receiptLoadTrakerLimit) { - console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${receiptLoadTrakerLimit}`) + if (receiptLoadTraker < config.receiptLoadTrakerLimit) { + console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${config.receiptLoadTrakerLimit}`) // Kill the extra workers from the end of the array for (let i = workers.length - 1; i >= 0; i--) { // console.log(`Killing worker ${workers[i].process.pid} with index ${i}`); @@ -53,7 +50,7 @@ const setupWorkerProcesses = (cluster: Cluster): void => { } return } - let neededWorkers = Math.ceil(receiptLoadTraker / receiptLoadTrakerLimit) + let neededWorkers = Math.ceil(receiptLoadTraker / config.receiptLoadTrakerLimit) if (neededWorkers > MAX_WORKERS) neededWorkers = MAX_WORKERS const currentWorkers = workers.length console.log(`Needed workers: ${neededWorkers}`, `Current workers: ${currentWorkers}`) @@ -80,7 +77,7 @@ const setupWorkerProcesses = (cluster: Cluster): void => { `Adjusted worker count to ${workers.length}, based on ${receiptLoadTraker} receipts received.` ) receiptLoadTraker = 0 // Reset the count - }, receiptLoadTrakerInterval) + }, config.receiptLoadTrakerInterval) } const setupWorkerListeners = (worker: Worker): void => { @@ -91,13 +88,13 @@ const setupWorkerListeners = (worker: Worker): void => { case 'receipt-verification': { // const result = results.get(workerId) // if (result) { - // verifiedCount++ + // verifiedReceiptCount++ // if (data.success) { // result.success++ - // successCount++ + // successReceiptCount++ // } else { // result.failure++ - // failureCount++ + // failureReceiptCount++ // } // } const { txId, timestamp } = data @@ -184,18 +181,11 @@ const forwardReceiptVerificationResult = ( // } export const offloadReceipt = async (receipt: ArchiverReceipt): Promise => { - receiptCount++ // Increment the counter for each receipt received + receivedReceiptCount++ // Increment the counter for each receipt received receiptLoadTraker++ // Increment the receipt load tracker let verificationResult: ReceiptVerificationResult if (workers.length === 0) { verificationResult = await verifyArchiverReceipt(receipt) - verifiedCount++ - if (verificationResult.success) { - successCount++ - } else { - failureCount++ - } - return verificationResult } else { // Forward the request to a worker in a round-robin fashion let worker = workers[currentWorker] @@ -224,13 +214,18 @@ export const offloadReceipt = async (receipt: ArchiverReceipt): Promise { if (!cluster.isPrimary) { // Initialize state from config await State.initFromConfig(config) + await initWorkerProcess() return } @@ -478,6 +481,7 @@ async function startServer(): Promise { Logger.mainLogger.debug('Archive-server has started.') State.setActive() Collector.scheduleMissingTxsDataQuery() + setupWorkerProcesses(cluster) } ) } diff --git a/src/worker-process/index.ts b/src/worker-process/index.ts index 3a135a65..98485e91 100644 --- a/src/worker-process/index.ts +++ b/src/worker-process/index.ts @@ -1,11 +1,10 @@ import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collector' import { ChildMessageInterface } from '../primary-process' +import { config } from '../Config' export const initWorkerProcess = async (): Promise => { console.log(`Worker ${process.pid} started`) let lastActivity = Date.now() - const lastActivityCheckInterval: number = 15 * 1000 // Interval to check last activity - const lastActivityCheckTimeout: number = 30 * 1000 // Timeout to check last activity // Worker processes process.on('message', async ({ type, data }: ChildMessageInterface) => { @@ -42,11 +41,14 @@ export const initWorkerProcess = async (): Promise => { lastActivity = Date.now() }) setInterval(() => { - if (Date.now() - lastActivity > lastActivityCheckTimeout) { + console.log( + `lastActivityCheckTimeout: ${config.lastActivityCheckTimeout}, lastActivityCheckInterval: ${config.lastActivityCheckInterval}` + ) + if (Date.now() - lastActivity > config.lastActivityCheckTimeout) { console.log(`Worker ${process.pid} is idle for more than 1 minute`) process.send({ type: 'clild_close' }) } - }, lastActivityCheckInterval) + }, config.lastActivityCheckInterval) } process.on('uncaughtException', (error) => {