Skip to content

Commit

Permalink
Improve worker threads management
Browse files Browse the repository at this point in the history
  • Loading branch information
jairajdev committed Aug 30, 2024
1 parent 0a6f01c commit 8335cb1
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ let config: Config = {
maxCyclesShardDataToKeep: 10,
configChangeMaxCyclesToKeep: 5,
configChangeMaxChangesToKeep: 1000,
receiptLoadTrakerInterval: 15 * 1000,
receiptLoadTrakerInterval: 10 * 1000,
receiptLoadTrakerLimit: 10,
lastActivityCheckInterval: 15 * 1000,
lastActivityCheckTimeout: 30 * 1000,
Expand Down
12 changes: 6 additions & 6 deletions src/Data/Collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -831,14 +831,14 @@ export const storeReceiptData = async (
if (profilerInstance) profilerInstance.profileSectionStart('Offload_receipt')
if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Offload_receipt')
const start_time = process.hrtime()
console.log('offloading receipt', txId, timestamp)
// console.log('offloading receipt', txId, timestamp)
const result = await offloadReceipt(txId, timestamp, requiredSignatures, receipt)
console.log('offload receipt result', txId, timestamp, result)
// console.log('offload receipt result', txId, timestamp, result)
const end_time = process.hrtime(start_time)
console.log(
`Time taken for receipt verification in millisecond is: `,
end_time[0] * 1000 + end_time[1] / 1000000
)
const time_taken = end_time[0] * 1000 + end_time[1] / 1000000
if (time_taken > 100) {
console.log(`Time taken for receipt verification in millisecond is: `, txId, timestamp, time_taken)
}
if (profilerInstance) profilerInstance.profileSectionEnd('Offload_receipt')
if (result.success === false) {
receiptsInValidationMap.delete(txId)
Expand Down
4 changes: 2 additions & 2 deletions src/dbstore/sqlite3storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export const createDB = async (dbPath: string, dbName: string): Promise<Database
await run(db, 'PRAGMA journal_mode=WAL')
db.on('profile', (sql, time) => {
if (time > 500 && time < 1000) {
console.log('SLOW QUERY', sql, time)
console.log('SLOW QUERY', process.pid, sql, time)
} else if (time > 1000) {
console.log('VERY SLOW QUERY', sql, time)
console.log('VERY SLOW QUERY', process.pid, sql, time)
}
})
console.log(`Database ${dbName} Initialized!`)
Expand Down
86 changes: 56 additions & 30 deletions src/primary-process/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export let successReceiptCount = 0 // Variable to keep track of the number of re
export let failureReceiptCount = 0 // Variable to keep track of the number of receipts failed verification

let receiptLoadTraker = 0 // Variable to keep track of the receipt load within the last receiptLoadTrakerInterval
let mainProcessReceiptTracker = 0 // receipt tracker for the receipts getting verified in the main process
// Creating a worker pool
const workers: Worker[] = []
const newWorkers = new Map<number, Worker>()
Expand All @@ -38,11 +39,8 @@ let currentWorker = 0

const emitter = new EventEmitter()

let masterProcessId

export const setupWorkerProcesses = (cluster: Cluster): void => {
console.log(`Master ${process.pid} is running`)
masterProcessId = process.pid
// Set interval to check receipt count every 15 seconds
setInterval(async () => {
for (const [, worker] of newWorkers) {
Expand All @@ -60,30 +58,38 @@ export const setupWorkerProcesses = (cluster: Cluster): void => {
const worker = workers.pop()
if (worker) extraWorkers.set(worker.process.pid, worker)
}
receiptLoadTraker = 0
return
}
let neededWorkers = Math.ceil(receiptLoadTraker / config.receiptLoadTrakerLimit)
if (neededWorkers > MAX_WORKERS) neededWorkers = MAX_WORKERS
const currentWorkers = workers.length
console.log(`Needed workers: ${neededWorkers}`, `Current workers: ${currentWorkers}`)
if (neededWorkers > currentWorkers) {
for (let i = currentWorkers; i < neededWorkers; i++) {
const worker = cluster.fork()
newWorkers.set(worker.process.pid, worker)
// results.set(worker.process.pid, { success: 0, failure: 0 })
setupWorkerListeners(worker)
}
} else if (neededWorkers < currentWorkers) {
// Kill the extra workers from the end of the array
for (let i = currentWorkers - 1; i >= neededWorkers; i--) {
// console.log(`Killing worker ${workers[i].process.pid} with index ${i}`);
// workers[i].kill();
// workers.pop();
} else {
let neededWorkers = Math.ceil(receiptLoadTraker / config.receiptLoadTrakerLimit)
if (neededWorkers > MAX_WORKERS) neededWorkers = MAX_WORKERS
let currentWorkers = workers.length
console.log(`Needed workers: ${neededWorkers}`, `Current workers: ${currentWorkers}`)
if (neededWorkers > currentWorkers) {
if (extraWorkers.size > 0) {
console.log(`Extra workers available: ${extraWorkers.size}, moving them to workers list`)
// Move the extra workers to the workers list
for (const [pid, worker] of extraWorkers) {
workers.push(worker)
extraWorkers.delete(pid)
}
currentWorkers = workers.length
}
for (let i = currentWorkers; i < neededWorkers; i++) {
const worker = cluster.fork()
newWorkers.set(worker.process.pid, worker)
// results.set(worker.process.pid, { success: 0, failure: 0 })
setupWorkerListeners(worker)
}
} else if (neededWorkers < currentWorkers) {
// Kill the extra workers from the end of the array
for (let i = currentWorkers - 1; i >= neededWorkers; i--) {
// console.log(`Killing worker ${workers[i].process.pid} with index ${i}`);
// workers[i].kill();
// workers.pop();

// Instead of killing the worker, move it to the extraWorkers map
const worker = workers.pop()
if (worker) extraWorkers.set(worker.process.pid, worker)
// Instead of killing the worker, move it to the extraWorkers map
const worker = workers.pop()
if (worker) extraWorkers.set(worker.process.pid, worker)
}
}
}
console.log(
Expand Down Expand Up @@ -113,7 +119,7 @@ const setupWorkerListeners = (worker: Worker): void => {
// }
// }
const { txId, timestamp } = data
console.log('receipt-verification', txId + timestamp)
// console.log('receipt-verification', txId + timestamp)
emitter.emit(txId + timestamp, data.verificationResult)
break
}
Expand Down Expand Up @@ -149,7 +155,7 @@ const setupWorkerListeners = (worker: Worker): void => {
}
break
default:
if (type.includes('axm')) {
if (type && type.includes('axm')) {
if (config.VERBOSE) {
console.log(`Worker ${workerId} is sending axm message: ${type}`)
console.log(data)
Expand Down Expand Up @@ -197,7 +203,7 @@ const forwardReceiptVerificationResult = (
): Promise<ReceiptVerificationResult> => {
return new Promise((resolve) => {
emitter.on(txId + timestamp, (result: ReceiptVerificationResult) => {
console.log('forwardReceiptVerificationResult', txId, timestamp)
// console.log('forwardReceiptVerificationResult', txId, timestamp)
resolve(result)
})
worker.on('exit', () => {
Expand All @@ -221,10 +227,30 @@ export const offloadReceipt = async (
receivedReceiptCount++ // Increment the counter for each receipt received
receiptLoadTraker++ // Increment the receipt load tracker
let verificationResult: ReceiptVerificationResult
if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// If there are extra workers available, put them to the workers list
if (extraWorkers.size > 0) {
console.log(
`offloadReceipt - Extra workers available: ${extraWorkers.size}, moving them to workers list`
)
// Move the extra workers to the workers list
for (const [pid, worker] of extraWorkers) {
workers.push(worker)
extraWorkers.delete(pid)
}
}
// // If there are still no workers available, add randon wait time (0-1 second) and proceed
// if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// await Utils.sleep(Math.floor(Math.random() * 1000))
// }
}
if (workers.length === 0) {
mainProcessReceiptTracker++
console.log('Verifying on the main program 1', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
mainProcessReceiptTracker--
} else {
mainProcessReceiptTracker = 0
// Forward the request to a worker in a round-robin fashion
let worker = workers[currentWorker]
currentWorker = (currentWorker + 1) % workers.length
Expand Down
8 changes: 4 additions & 4 deletions src/worker-process/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const initWorkerProcess = async (): Promise<void> => {
nestedCounterMessages: [],
}
try {
console.log(`Worker process ${process.pid} is verifying receipt`, receipt.tx.txId, receipt.tx.timestamp)
// console.log(`Worker process ${process.pid} is verifying receipt`, receipt.tx.txId, receipt.tx.timestamp)
verificationResult = await verifyArchiverReceipt(receipt, data.requiredSignatures)
} catch (error) {
console.error(`Error in Worker ${process.pid} while verifying receipt`, error)
Expand All @@ -54,9 +54,9 @@ export const initWorkerProcess = async (): Promise<void> => {
})
process.send({ type: 'child_ready' })
setInterval(() => {
console.log(
`lastActivityCheckTimeout: ${config.lastActivityCheckTimeout}, lastActivityCheckInterval: ${config.lastActivityCheckInterval}`
)
// console.log(
// `lastActivityCheckTimeout: ${config.lastActivityCheckTimeout}, lastActivityCheckInterval: ${config.lastActivityCheckInterval}`
// )
if (Date.now() - lastActivity > config.lastActivityCheckTimeout) {
console.log(`Worker ${process.pid} is idle for more than 1 minute`)
process.send({ type: 'child_close' })
Expand Down

0 comments on commit 8335cb1

Please sign in to comment.