diff --git a/src/Config.ts b/src/Config.ts index 4be93edc..c89ba630 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -50,6 +50,7 @@ export interface Config { maxValidatorsToServe: number limitToArchiversOnly: boolean verifyReceiptData: boolean + verifyReceiptSignaturesSeparately: boolean verifyAppReceiptData: boolean verifyAccountData: boolean skipGlobalTxReceiptVerification: boolean // To skip verification of global tx receipts for now @@ -134,6 +135,7 @@ let config: Config = { maxValidatorsToServe: 10, // max number of validators to serve accounts data during restore mode limitToArchiversOnly: true, verifyReceiptData: true, + verifyReceiptSignaturesSeparately: true, verifyAccountData: true, verifyAppReceiptData: true, skipGlobalTxReceiptVerification: true, diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index c98377b1..d48cee1e 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -58,7 +58,6 @@ export interface ReceiptVerificationResult { success: boolean failedReasons?: string[] nestedCounterMessages?: string[] - newReceipt?: Receipt.ArchiverReceipt } /** @@ -406,10 +405,8 @@ export const validateArchiverReceipt = (receipt: Receipt.ArchiverReceipt): boole export const verifyReceiptData = async ( receipt: Receipt.ArchiverReceipt, - checkReceiptRobust = true, - failedReasons = [], - nestedCounterMessages = [] -): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + checkReceiptRobust = true +): Promise<{ success: boolean; requiredSignatures?: number; newReceipt?: Receipt.ArchiverReceipt }> => { const result = { success: false } // Check the signed nodes are part of the execution group nodes of the tx const { executionShardKey, cycle, appliedReceipt, globalModification } = receipt @@ -427,14 +424,14 @@ export const verifyReceiptData = async ( } const currentCycle = getCurrentCycleCounter() if (currentCycle - cycle > 2) { - failedReasons.push( + Logger.mainLogger.error( `Found receipt with cycle older than 2 cycles ${txId}, ${cycle}, ${timestamp}, ${currentCycle}` ) } const cycleShardData = shardValuesByCycle.get(cycle) if (!cycleShardData) { - failedReasons.push('Cycle shard data not found') - nestedCounterMessages.push('Cycle_shard_data_not_found') + Logger.mainLogger.error('Cycle shard data not found') + if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Cycle_shard_data_not_found') return result } // Determine the home partition index of the primary account (executionShardKey) @@ -450,118 +447,144 @@ export const verifyReceiptData = async ( ? Math.ceil(votingGroupCount * config.requiredVotesPercentage) : Math.round(votingGroupCount * config.requiredVotesPercentage) if (signatures.length < requiredSignatures) { - failedReasons.push( + Logger.mainLogger.error( `Invalid receipt appliedReceipt signatures count is less than requiredSignatures, ${signatures.length}, ${requiredSignatures}` ) - nestedCounterMessages.push( - 'Invalid_receipt_appliedReceipt_signatures_count_less_than_requiredSignatures' - ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedReceipt_signatures_count_less_than_requiredSignatures' + ) return result } - // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2799 - const voteHash = calculateVoteHash(appliedVote) - // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2663 - const appliedVoteHash = { - txid: txId, - voteHash, - } - // Using a map to store the good signatures to avoid duplicates - const goodSignatures = new Map() + // Using a set to store the unique signatures to avoid duplicates + const uniqueSigners = new Set() for (const signature of signatures) { const { owner: nodePubKey } = signature // Get the node id from the public key const node = cycleShardData.nodes.find((node) => node.publicKey === nodePubKey) if (node == null) { - failedReasons.push( + Logger.mainLogger.error( `The node with public key ${nodePubKey} of the receipt ${txId}} with ${timestamp} is not in the active nodesList of cycle ${cycle}` ) - nestedCounterMessages.push('appliedReceipt_signature_owner_not_in_active_nodesList') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'appliedReceipt_signature_owner_not_in_active_nodesList' + ) continue } // Check if the node is in the execution group if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[node.id]) { - failedReasons.push( + Logger.mainLogger.error( `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the execution group of the tx` ) - nestedCounterMessages.push('appliedReceipt_signature_node_not_in_execution_group_of_tx') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'appliedReceipt_signature_node_not_in_execution_group_of_tx' + ) continue } - if (Crypto.verify({ ...appliedVoteHash, sign: signature })) { - goodSignatures.set(signature.owner, signature) - // Break the loop if the required number of good signatures are found - if (goodSignatures.size >= requiredSignatures) break - } + uniqueSigners.add(nodePubKey) } - if (goodSignatures.size < requiredSignatures) { - failedReasons.push( - `Invalid receipt appliedReceipt valid signatures count is less than requiredSignatures ${goodSignatures.size}, ${requiredSignatures}` - ) - nestedCounterMessages.push( - 'Invalid_receipt_appliedReceipt_valid_signatures_count_less_than_requiredSignatures' + if (uniqueSigners.size < requiredSignatures) { + Logger.mainLogger.error( + `Invalid receipt appliedReceipt valid signatures count is less than requiredSignatures ${uniqueSigners.size}, ${requiredSignatures}` ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedReceipt_valid_signatures_count_less_than_requiredSignatures' + ) return result } - return { success: true } + return { success: true, requiredSignatures } } const { confirmOrChallenge } = appliedReceipt // Check if the appliedVote node is in the execution group if (!cycleShardData.nodeShardDataMap.has(appliedVote.node_id)) { - failedReasons.push('Invalid receipt appliedReceipt appliedVote node is not in the active nodesList') - nestedCounterMessages.push('Invalid_receipt_appliedVote_node_not_in_active_nodesList') + Logger.mainLogger.error('Invalid receipt appliedReceipt appliedVote node is not in the active nodesList') + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Invalid_receipt_appliedVote_node_not_in_active_nodesList') return result } if (appliedVote.sign.owner !== cycleShardData.nodeShardDataMap.get(appliedVote.node_id).node.publicKey) { - failedReasons.push( + Logger.mainLogger.error( 'Invalid receipt appliedReceipt appliedVote node signature owner and node public key does not match' ) - nestedCounterMessages.push( - 'Invalid_receipt_appliedVote_node_signature_owner_and_node_public_key_does_not_match' - ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedVote_node_signature_owner_and_node_public_key_does_not_match' + ) return result } if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[appliedVote.node_id]) { - failedReasons.push( + Logger.mainLogger.error( 'Invalid receipt appliedReceipt appliedVote node is not in the execution group of the tx' ) - nestedCounterMessages.push('Invalid_receipt_appliedVote_node_not_in_execution_group_of_tx') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedVote_node_not_in_execution_group_of_tx' + ) return result } if (!Crypto.verify(appliedVote)) { - failedReasons.push('Invalid receipt appliedReceipt appliedVote signature verification failed') - nestedCounterMessages.push('Invalid_receipt_appliedVote_signature_verification_failed') + Logger.mainLogger.error('Invalid receipt appliedReceipt appliedVote signature verification failed') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedVote_signature_verification_failed' + ) return result } // Check if the confirmOrChallenge node is in the execution group if (!cycleShardData.nodeShardDataMap.has(confirmOrChallenge.nodeId)) { - failedReasons.push( + Logger.mainLogger.error( 'Invalid receipt appliedReceipt confirmOrChallenge node is not in the active nodesList' ) - nestedCounterMessages.push('Invalid_receipt_confirmOrChallenge_node_not_in_active_nodesList') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_confirmOrChallenge_node_not_in_active_nodesList' + ) return result } if ( confirmOrChallenge.sign.owner !== cycleShardData.nodeShardDataMap.get(confirmOrChallenge.nodeId).node.publicKey ) { - failedReasons.push( + Logger.mainLogger.error( 'Invalid receipt appliedReceipt confirmOrChallenge node signature owner and node public key does not match' ) - nestedCounterMessages.push( - 'Invalid_receipt_confirmOrChallenge_signature_owner_and_node_public_key_does_not_match' - ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_confirmOrChallenge_signature_owner_and_node_public_key_does_not_match' + ) return result } if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[confirmOrChallenge.nodeId]) { - failedReasons.push( + Logger.mainLogger.error( 'Invalid receipt appliedReceipt confirmOrChallenge node is not in the execution group of the tx' ) - nestedCounterMessages.push('Invalid_receipt_confirmOrChallenge_node_not_in_execution_group_of_tx') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_confirmOrChallenge_node_not_in_execution_group_of_tx' + ) return result } if (!Crypto.verify(confirmOrChallenge)) { - failedReasons.push('Invalid receipt appliedReceipt confirmOrChallenge signature verification failed') - nestedCounterMessages.push('Invalid_receipt_confirmOrChallenge_signature_verification_failed') + Logger.mainLogger.error('Invalid receipt appliedReceipt confirmOrChallenge signature verification failed') + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_confirmOrChallenge_signature_verification_failed' + ) return result } @@ -577,11 +600,51 @@ export const verifyReceiptData = async ( : Math.ceil(config.RECEIPT_CONFIRMATIONS / 2) // 3 out of 5 nodes const { success, newReceipt } = await isReceiptRobust(receipt, executionGroupNodes, minConfirmations) if (!success) { - failedReasons.push('Invalid receipt: Robust check failed') - nestedCounterMessages.push('Invalid_receipt_robust_check_failed') + Logger.mainLogger.error('Invalid receipt: Robust check failed') + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Invalid_receipt_robust_check_failed') + return result + } + if (newReceipt) return { success: true, requiredSignatures: 0, newReceipt } + return { success: true } +} + +const verifyAppliedReceiptSignatures = ( + receipt: Receipt.ArchiverReceipt, + requiredSignatures: number, + failedReasons = [], + nestedCounterMessages = [] +): { success: boolean } => { + const result = { success: false, failedReasons, nestedCounterMessages } + const { appliedReceipt, globalModification } = receipt + if (globalModification && config.skipGlobalTxReceiptVerification) return { success: true } + const { appliedVote, signatures } = appliedReceipt + const { txId } = receipt.tx + // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2799 + const voteHash = calculateVoteHash(appliedVote) + // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2663 + const appliedVoteHash = { + txid: txId, + voteHash, + } + // Using a map to store the good signatures to avoid duplicates + const goodSignatures = new Map() + for (const signature of signatures) { + if (Crypto.verify({ ...appliedVoteHash, sign: signature })) { + goodSignatures.set(signature.owner, signature) + // Break the loop if the required number of good signatures are found + if (goodSignatures.size >= requiredSignatures) break + } + } + if (goodSignatures.size < requiredSignatures) { + failedReasons.push( + `Invalid receipt appliedReceipt valid signatures count is less than requiredSignatures ${goodSignatures.size}, ${requiredSignatures}` + ) + nestedCounterMessages.push( + 'Invalid_receipt_appliedReceipt_valid_signatures_count_less_than_requiredSignatures' + ) return result } - if (newReceipt) return { success: true, newReceipt } return { success: true } } @@ -615,28 +678,13 @@ const calculateVoteHash = (vote: Receipt.AppliedVote, failedReasons = []): strin } export const verifyArchiverReceipt = async ( - receipt: Receipt.ArchiverReceipt + receipt: Receipt.ArchiverReceipt, + requiredSignatures: number ): Promise => { const { txId, timestamp } = receipt.tx const existingReceipt = await Receipt.queryReceiptByReceiptId(txId) const failedReasons = [] const nestedCounterMessages = [] - if ( - config.usePOQo === false && - existingReceipt && - receipt.appliedReceipt && - receipt.appliedReceipt.confirmOrChallenge && - receipt.appliedReceipt.confirmOrChallenge.message === 'challenge' - ) { - // If the existing receipt is confirmed, and the new receipt is challenged, then skip saving the new receipt - if (existingReceipt.appliedReceipt.confirmOrChallenge.message === 'confirm') { - failedReasons.push( - `Existing receipt is confirmed, but new receipt is challenged ${txId}, ${receipt.cycle}, ${timestamp}` - ) - nestedCounterMessages.push('Existing_receipt_is_confirmed_but_new_receipt_is_challenged') - return { success: false, failedReasons, nestedCounterMessages } - } - } if (config.verifyAppReceiptData) { // if (profilerInstance) profilerInstance.profileSectionStart('Verify_app_receipt_data') // if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Verify_app_receipt_data') @@ -673,24 +721,23 @@ export const verifyArchiverReceipt = async ( return { success: false, failedReasons, nestedCounterMessages } } } - if (config.verifyReceiptData) { - // if (profilerInstance) profilerInstance.profileSectionStart('Verify_receipt_data') - // if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Verify_receipt_data') - const { success, newReceipt } = await verifyReceiptData( + if (config.verifyReceiptSignaturesSeparately) { + // if (profilerInstance) profilerInstance.profileSectionStart('Verify_receipt_signatures_data') + // if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Verify_receipt_signatures_data') + const { success } = verifyAppliedReceiptSignatures( receipt, - true, + requiredSignatures, failedReasons, nestedCounterMessages ) - // if (profilerInstance) profilerInstance.profileSectionEnd('Verify_receipt_data') + // if (profilerInstance) profilerInstance.profileSectionEnd('Verify_receipt_signatures_data') if (!success) { failedReasons.push(`Invalid receipt: Verification failed ${txId}, ${receipt.cycle}, ${timestamp}`) nestedCounterMessages.push('Invalid_receipt_verification_failed') return { success: false, failedReasons, nestedCounterMessages } } - return { success: true, failedReasons, nestedCounterMessages, newReceipt } } - return { success: true } + return { success: true, failedReasons, nestedCounterMessages } } export const storeReceiptData = async ( @@ -730,21 +777,68 @@ export const storeReceiptData = async ( continue } - const stringifiedReceipt = StringUtils.safeStringify(receipt) if (verifyData) { - const result = await offloadReceipt(txId, timestamp, stringifiedReceipt, receipt) - if (result.success === false) { - receiptsInValidationMap.delete(txId) - for (const message of result.failedReasons) { - Logger.mainLogger.error(message) + if (config.usePOQo === false) { + const existingReceipt = await Receipt.queryReceiptByReceiptId(txId) + if ( + existingReceipt && + receipt.appliedReceipt && + receipt.appliedReceipt.confirmOrChallenge && + receipt.appliedReceipt.confirmOrChallenge.message === 'challenge' + ) { + // If the existing receipt is confirmed, and the new receipt is challenged, then skip saving the new receipt + if (existingReceipt.appliedReceipt.confirmOrChallenge.message === 'confirm') { + Logger.mainLogger.error( + `Existing receipt is confirmed, but new receipt is challenged ${txId}, ${receipt.cycle}, ${timestamp}` + ) + receiptsInValidationMap.delete(txId) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Existing_receipt_is_confirmed_but_new_receipt_is_challenged' + ) + if (profilerInstance) profilerInstance.profileSectionEnd('Validate_receipt') + continue + } } - for (const message of result.nestedCounterMessages) { - if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', message) + } + + if (config.verifyReceiptData) { + const { success, requiredSignatures, newReceipt } = await verifyReceiptData(receipt) + if (!success) { + Logger.mainLogger.error('Invalid receipt: Verification failed', txId, receipt.cycle, timestamp) + receiptsInValidationMap.delete(txId) + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Invalid_receipt_verification_failed') + if (profilerInstance) profilerInstance.profileSectionEnd('Validate_receipt') + continue + } + if (newReceipt) receipt = newReceipt + + if (profilerInstance) profilerInstance.profileSectionStart('Offload_receipt') + if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Offload_receipt') + const start_time = process.hrtime(); + console.log('offloading receipt', txId, timestamp) + const result = await offloadReceipt(txId, timestamp, requiredSignatures, receipt) + console.log('offload receipt result', txId, timestamp, result) + const end_time = process.hrtime(start_time); + console.log( + `Time taken for receipt verification in millisecond is: `, + end_time[0] * 1000 + end_time[1] / 1000000 + ); + if (profilerInstance) profilerInstance.profileSectionEnd('Offload_receipt') + if (result.success === false) { + receiptsInValidationMap.delete(txId) + for (const message of result.failedReasons) { + Logger.mainLogger.error(message) + } + for (const message of result.nestedCounterMessages) { + if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', message) + } + if (profilerInstance) profilerInstance.profileSectionEnd('Validate_receipt') + continue } - if (profilerInstance) profilerInstance.profileSectionEnd('Validate_receipt') - continue } - if (result.newReceipt) receipt = result.newReceipt } if (profilerInstance) profilerInstance.profileSectionEnd('Validate_receipt') // await Receipt.insertReceipt({ diff --git a/src/State.ts b/src/State.ts index 8423ff16..cd29993f 100644 --- a/src/State.ts +++ b/src/State.ts @@ -40,7 +40,7 @@ export let isFirst = false export let isActive = false export const archiversReputation: Map = new Map() -export async function initFromConfig(config: Config, shutDownMode = false): Promise { +export async function initFromConfig(config: Config, shutDownMode = false, useArchiverDiscovery = true): Promise { // Get own nodeInfo from config nodeState.ip = config.ARCHIVER_IP nodeState.port = config.ARCHIVER_PORT @@ -49,6 +49,8 @@ export async function initFromConfig(config: Config, shutDownMode = false): Prom nodeState.curvePk = Crypto.core.convertPkToCurve(nodeState.publicKey) nodeState.curveSk = Crypto.core.convertSkToCurve(nodeState.secretKey) + if (useArchiverDiscovery === false) return + let existingArchivers: ArchiverNodeInfo[] = [] // Parse existing archivers list try { diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index 96006923..94aada51 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -4,15 +4,15 @@ import { ArchiverReceipt } from '../dbstore/receipts' import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collector' import { config } from '../Config' import { EventEmitter } from 'events' -import { shardValuesByCycle } from '../Data/Cycles' -import { StateManager } from '@shardus/types' +import { StateManager, Utils as StringUtils } from '@shardus/types' const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process export interface ChildMessageInterface { type: string data: { - receipt?: string + stringifiedReceipt?: string + requiredSignatures?: number success?: boolean err?: string txId?: string @@ -31,6 +31,7 @@ export let failureReceiptCount = 0 // Variable to keep track of the number of re let receiptLoadTraker = 0 // Variable to keep track of the receipt load within the last receiptLoadTrakerInterval // Creating a worker pool const workers: Worker[] = [] +const newWorkers = new Map() const extraWorkers = new Map() let currentWorker = 0 @@ -39,7 +40,10 @@ const emitter = new EventEmitter() export const setupWorkerProcesses = (cluster: Cluster): void => { console.log(`Master ${process.pid} is running`) // Set interval to check receipt count every 15 seconds - setInterval(() => { + setInterval(async () => { + for (const [,worker] of newWorkers) { + worker.kill() + } if (receiptLoadTraker < config.receiptLoadTrakerLimit) { console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${config.receiptLoadTrakerLimit}`) // Kill the extra workers from the end of the array @@ -62,13 +66,7 @@ export const setupWorkerProcesses = (cluster: Cluster): void => { if (neededWorkers > currentWorkers) { for (let i = currentWorkers; i < neededWorkers; i++) { const worker = cluster.fork() - workers.push(worker) - for (const [cycle, shardValues] of shardValuesByCycle) { - worker.send({ - type: 'shardValuesByCycle', - data: { cycle, shardValues }, - }) - } + newWorkers.set(worker.process.pid, worker) // results.set(worker.process.pid, { success: 0, failure: 0 }) setupWorkerListeners(worker) } @@ -85,7 +83,7 @@ export const setupWorkerProcesses = (cluster: Cluster): void => { } } console.log( - `Adjusted worker count to ${workers.length}, based on ${receiptLoadTraker} receipts received.` + `Adjusted worker count to ${workers.length + newWorkers.size}, based on ${receiptLoadTraker} receipts received.` ) receiptLoadTraker = 0 // Reset the count }, config.receiptLoadTrakerInterval) @@ -109,10 +107,11 @@ const setupWorkerListeners = (worker: Worker): void => { // } // } const { txId, timestamp } = data + console.log('receipt-verification', txId + timestamp) emitter.emit(txId + timestamp, data.verificationResult) break } - case 'clild_close': + case 'child_close': console.log(`Worker ${workerId} is requesting to close`) // Check if the worker is in the extraWorkers map if (extraWorkers.has(workerId)) { @@ -132,6 +131,17 @@ const setupWorkerListeners = (worker: Worker): void => { } } break + case 'child_ready': + console.log(`Worker ${workerId} is ready for the duty`) + // Check if the worker is in the newWorkers map + if (newWorkers.has(workerId)) { + console.log(`Worker ${workerId} is in newWorkers, moving it to the workers list`) + workers.push(newWorkers.get(workerId)) + newWorkers.delete(workerId) + } else { + console.error(`Worker ${workerId}is not in the newWorkers list`) + } + break default: console.log(`Worker ${process.pid} is sending unknown message type: ${type}`) console.log(data) @@ -148,16 +158,21 @@ const setupWorkerListeners = (worker: Worker): void => { extraWorkers.get(workerId)?.kill() extraWorkers.delete(workerId) } + let isNewWorker = false + if(newWorkers.has(workerId)) { + console.log(`Worker ${workerId} is in newWorkers, removing it now`) + isNewWorker = true + newWorkers.get(workerId)?.kill() + newWorkers.delete(workerId) + } // Remove the worker from the workers list if not present in extraWorkers const workerIndex = workers.findIndex((worker) => worker.process.pid === workerId) if (workerIndex !== -1) { - if (isExtraWorker) { - console.error(`Worker ${workerId} is in workers list as well`) - } + if (isExtraWorker || isNewWorker) console.log(`Worker ${workerId} is in workers list as well`) workers[workerIndex]?.kill() workers.splice(workerIndex, 1) } else { - if (!isExtraWorker) console.error(`Worker ${workerId} is not in workers list`) + if (!isExtraWorker || !isNewWorker) console.error(`Worker ${workerId} is not in workers list`) } }) } @@ -169,13 +184,14 @@ const forwardReceiptVerificationResult = ( ): Promise => { return new Promise((resolve) => { emitter.on(txId + timestamp, (result: ReceiptVerificationResult) => { + console.log('forwardReceiptVerificationResult', txId, timestamp) resolve(result) }) worker.on('exit', () => { resolve({ success: false, failedReasons: [ - `Worker exited before sending the receipt verification result for ${txId} with timestamp ${timestamp}`, + `Worker ${worker.process.pid} exited before sending the receipt verification result for ${txId} with timestamp ${timestamp}`, ], nestedCounterMessages: ['Worker exited before sending the receipt verification result'], }) @@ -186,14 +202,15 @@ const forwardReceiptVerificationResult = ( export const offloadReceipt = async ( txId: string, timestamp: number, - receipt: string, - receipt2: ArchiverReceipt + requiredSignatures: number, + receipt: ArchiverReceipt ): Promise => { receivedReceiptCount++ // Increment the counter for each receipt received receiptLoadTraker++ // Increment the receipt load tracker let verificationResult: ReceiptVerificationResult if (workers.length === 0) { - verificationResult = await verifyArchiverReceipt(receipt2) + console.log('Verifying on the main program 1', txId, timestamp) + verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures) } else { // Forward the request to a worker in a round-robin fashion let worker = workers[currentWorker] @@ -203,20 +220,31 @@ export const offloadReceipt = async ( worker = workers[currentWorker] currentWorker = (currentWorker + 1) % workers.length if (worker) { + console.log('Verifying on the worker process 2', txId, timestamp, worker.process.pid) + const cloneReceipt = { ...receipt } + delete cloneReceipt.tx.originalTxData + delete cloneReceipt.executionShardKey + const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt) worker.send({ type: 'receipt-verification', - data: { receipt }, + data: { stringifiedReceipt, requiredSignatures }, }) verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker) } else { console.error('No worker available to process the receipt 2') // Verifying the receipt in the main thread - verificationResult = await verifyArchiverReceipt(receipt2) + console.log('Verifying on the main program 2', txId, timestamp) + verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures) } } else { + console.log('Verifying on the worker process 1', txId, timestamp, worker.process.pid) + const cloneReceipt = { ...receipt } + delete cloneReceipt.tx.originalTxData + delete cloneReceipt.executionShardKey + const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt) worker.send({ type: 'receipt-verification', - data: { receipt }, + data: { stringifiedReceipt, requiredSignatures }, }) verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker) } diff --git a/src/server.ts b/src/server.ts index 05ab778c..5a259c7e 100644 --- a/src/server.ts +++ b/src/server.ts @@ -76,7 +76,7 @@ async function start(): Promise { if (!cluster.isPrimary) { // Initialize state from config - await State.initFromConfig(config) + await State.initFromConfig(config, false, false) await initWorkerProcess() return } diff --git a/src/worker-process/index.ts b/src/worker-process/index.ts index 7d10a4d2..3166b5a1 100644 --- a/src/worker-process/index.ts +++ b/src/worker-process/index.ts @@ -3,7 +3,6 @@ import { ChildMessageInterface } from '../primary-process' import { config } from '../Config' import { Utils as StringUtils } from '@shardus/types' import { ArchiverReceipt } from '../dbstore/receipts' -import { cleanShardCycleData, shardValuesByCycle } from '../Data/Cycles' export const initWorkerProcess = async (): Promise => { console.log(`Worker ${process.pid} started`) @@ -13,11 +12,15 @@ export const initWorkerProcess = async (): Promise => { process.on('message', async ({ type, data }: ChildMessageInterface) => { switch (type) { case 'receipt-verification': { - if (!data.receipt) { + if (!data.stringifiedReceipt) { console.error(`Worker ${process.pid} received invalid receipt for verification`, data) return } - const receipt2 = StringUtils.safeJsonParse(data.receipt) as ArchiverReceipt + if (isNaN(data.requiredSignatures)) { + console.error(`Worker ${process.pid} received invalid requiredSignatures for verification`, data) + return + } + const receipt = StringUtils.safeJsonParse(data.stringifiedReceipt) as ArchiverReceipt // console.log(`Worker ${process.pid} verifying receipt`); let verificationResult: ReceiptVerificationResult = { success: false, @@ -25,7 +28,8 @@ export const initWorkerProcess = async (): Promise => { nestedCounterMessages: [], } try { - verificationResult = await verifyArchiverReceipt(receipt2) + console.log(`Worker process ${process.pid} is verifying receipt`, receipt.tx.txId, receipt.tx.timestamp) + verificationResult = await verifyArchiverReceipt(receipt, data.requiredSignatures) } catch (error) { console.error(`Error in Worker ${process.pid} while verifying receipt`, error) verificationResult.failedReasons.push('Error in Worker while verifying receipt') @@ -34,19 +38,13 @@ export const initWorkerProcess = async (): Promise => { process.send({ type: 'receipt-verification', data: { - txId: receipt2.tx.txId, - timestamp: receipt2.tx.timestamp, + txId: receipt.tx.txId, + timestamp: receipt.tx.timestamp, verificationResult, }, }) break } - case 'shardValuesByCycle': { - const { cycle, shardValues } = data - shardValuesByCycle.set(cycle, shardValues) - cleanShardCycleData(cycle - config.maxCyclesShardDataToKeep) - break - } default: console.log(`Worker ${process.pid} received unknown message type: ${type}`) console.log(data) @@ -54,13 +52,14 @@ export const initWorkerProcess = async (): Promise => { } lastActivity = Date.now() }) + process.send({ type: 'child_ready' }) setInterval(() => { console.log( `lastActivityCheckTimeout: ${config.lastActivityCheckTimeout}, lastActivityCheckInterval: ${config.lastActivityCheckInterval}` ) if (Date.now() - lastActivity > config.lastActivityCheckTimeout) { console.log(`Worker ${process.pid} is idle for more than 1 minute`) - process.send({ type: 'clild_close' }) + process.send({ type: 'child_close' }) } }, config.lastActivityCheckInterval) }