diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..4597d442 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,77 @@ +name: Node CI Workflow +# +# The parameters are defaulted at the org level but can be overridden on the repository. +# See the github-automation repo for more documentation +# +on: + push: + branches: + - dev + - main + pull_request: + branches: + - dev + - main + issue_comment: + inputs: + workflowBranch: + description: 'Branch of the reusable workflow. Defaults to main, select dev for testing only.' + required: true + default: 'main' + type: choice + options: + - dev + - main + workflow_dispatch: + inputs: + workflowBranch: + description: 'Branch of the reusable workflow. Defaults to main, select dev for testing only.' + required: true + default: 'main' + type: choice + options: + - dev + - main +jobs: + echo-inputs: + name: Repo Workflow Debugging + runs-on: ubuntu-latest + steps: + - name: Check Repo Vars + run: | + echo "*** Start - Check inputs in repo workflow ***" + echo "Node Version: ${{ vars.NODE_VERSION }}" + echo "Lint Required: ${{ vars.IS_LINT_REQUIRED }}" + echo "Format Check Required: ${{ vars.IS_FORMAT_CHECK_REQUIRED }}" + echo "Apply Patches Required: ${{ vars.IS_APPLY_PATCHES_REQUIRED }}" + echo "Unit Tests Required: ${{ vars.IS_UNIT_TESTS_REQUIRED }}" + echo "*** End - Check inputs in repo workflow ***" + ci-test-only: + if: ${{ github.event.inputs.workflowBranch == 'dev' }} + uses: shardeum/github-automation/.github/workflows/reusable-node-ci.yml@dev + permissions: + issues: write + pull-requests: write + contents: write + with: + node-version: ${{ vars.NODE_VERSION }} + lint-required: ${{ vars.IS_LINT_REQUIRED == 'true' }} + format-check-required: ${{ vars.IS_FORMAT_CHECK_REQUIRED == 'true' }} + apply-patches-required: ${{ vars.IS_APPLY_PATCHES_REQUIRED == 'true' }} + unit-tests-required: ${{ vars.IS_UNIT_TESTS_REQUIRED == 'true' }} + secrets: inherit + + ci: + if: ${{ github.event.inputs.workflowBranch == 'main' || !github.event.inputs.workflowBranch }} + uses: shardeum/github-automation/.github/workflows/reusable-node-ci.yml@main + permissions: + issues: write + pull-requests: write + contents: write + with: + node-version: ${{ vars.NODE_VERSION }} + lint-required: ${{ vars.IS_LINT_REQUIRED == 'true' }} + format-check-required: ${{ vars.IS_FORMAT_CHECK_REQUIRED == 'true' }} + apply-patches-required: ${{ vars.IS_APPLY_PATCHES_REQUIRED == 'true' }} + unit-tests-required: ${{ vars.IS_UNIT_TESTS_REQUIRED == 'true' }} + secrets: inherit diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 1de80e8e..00000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,69 +0,0 @@ -# You can override the included template(s) by including variable overrides -# SAST customization: https://docs.gitlab.com/ee/user/application_security/sast/#customizing-the-sast-settings -# Secret Detection customization: https://docs.gitlab.com/ee/user/application_security/secret_detection/#customizing-settings -# Dependency Scanning customization: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/#customizing-the-dependency-scanning-settings -# Container Scanning customization: https://docs.gitlab.com/ee/user/application_security/container_scanning/#customizing-the-container-scanning-settings -# Note that environment variables can be set in several places -# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence - -# Include security-related templates -include: - - template: Security/Dependency-Scanning.gitlab-ci.yml - - template: Security/Secret-Detection.gitlab-ci.yml - - template: Security/SAST.gitlab-ci.yml - - remote: 'https://gitlab.com/pod_security/shared-ci/-/raw/main/security.yml' - -# Define the default Docker image for all jobs -image: registry.gitlab.com/shardus/dev-container - -# Define global cache settings for all jobs -cache: - key: '$CI_COMMIT_REF_SLUG-node-modules' - paths: - - node_modules/ - -# Define the stages for the pipeline -stages: - - prepare - - build - - appsec - - test - -# Prepare Job: Install Node.js dependencies -prepare-job: - stage: prepare - script: - - npm ci - -# Build Job: Compiles the code -compile-job: - stage: build - needs: ['prepare-job'] - script: - - echo "Running Compiler..." - - npm run compile - - echo "Compilation complete." - -# Format Checker Job: Runs Prettier for code formatting -format-checker-job: - stage: build - needs: ['prepare-job'] - script: - - echo "Running Prettier..." - - npm run format-check - - echo "Running Prettier complete." - -# Lint Checker Job: Runs ESlint for code linting -lint-checker-job: - stage: build - needs: ['prepare-job'] - script: - - echo "Running ESlint..." - - npm run lint - - echo "Running ESlint complete." - -# SAST Job: Performs static application security testing -sast: - variables: - SAST_EXCLUDED_ANALYZERS: bandit, brakeman, flawfinder, gosec, kubesec, phpcs-security-audit, pmd-apex, security-code-scan, semgrep, sobelow, spotbugs - stage: test diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 00000000..200b7372 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1,13 @@ +# CODEOWNERS file +# To add additional teams to any approval, include them on the same line separated by spaces +# It is best practice to assign a team as a code owner and not an invidual. +# Please submit requests for new teams to Systems and Automation + +# Global approval (all files) +# * @shardeum/team-name + +# Directory-level approval +/.github/ @shardeum/systems-and-automation + +# Specific file rules +# README.md @shardeum/team-name diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..b7a2b293 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Shardeum + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index f630e2f3..da3f96a7 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,11 @@ This is a node that runs as part of the shardus network, with the function of re To release, just run `npm run release` +## Health Check + +GET `/is-alive` this endpoint returns 200 if the server is running. +GET `/is-healthy` currently the same as `/is-alive` but will be expanded. + ## Contributing Contributions are very welcome! Everyone interacting in our codebases, issue trackers, and any other form of communication, including chat rooms and mailing lists, is expected to follow our [code of conduct](./CODE_OF_CONDUCT.md) so we can all enjoy the effort we put into this project. diff --git a/archiver-config.json b/archiver-config.json index ffac25a3..a892baf9 100644 --- a/archiver-config.json +++ b/archiver-config.json @@ -56,7 +56,7 @@ "publicKey": "aec5d2b663869d9c22ba99d8de76f3bff0f54fa5e39d2899ec1f3f4543422ec7" } ], - "ARCHIVER_MODE": "debug", + "ARCHIVER_MODE": "release", "DevPublicKey": "", "EXISTING_ARCHIVER_DB_PATH": "" } \ No newline at end of file diff --git a/debug_mode.patch b/debug_mode.patch new file mode 100644 index 00000000..cc6058c0 --- /dev/null +++ b/debug_mode.patch @@ -0,0 +1,27 @@ +diff --git a/archiver-config.json b/archiver-config.json +index a892baf..ffac25a 100644 +--- a/archiver-config.json ++++ b/archiver-config.json +@@ -56,7 +56,7 @@ + "publicKey": "aec5d2b663869d9c22ba99d8de76f3bff0f54fa5e39d2899ec1f3f4543422ec7" + } + ], +- "ARCHIVER_MODE": "release", ++ "ARCHIVER_MODE": "debug", + "DevPublicKey": "", + "EXISTING_ARCHIVER_DB_PATH": "" + } +\ No newline at end of file +diff --git a/src/Config.ts b/src/Config.ts +index 6b41ee4..a812003 100644 +--- a/src/Config.ts ++++ b/src/Config.ts +@@ -86,7 +86,7 @@ let config: Config = { + save: true, + interval: 1, + }, +- ARCHIVER_MODE: 'release', // 'debug'/'release' ++ ARCHIVER_MODE: 'debug', // 'debug'/'release' + DevPublicKey: '', + dataLogWrite: true, + dataLogWriter: { diff --git a/package-lock.json b/package-lock.json index 194a52a4..8afc5e77 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@shardus/archiver", - "version": "3.4.21", + "version": "3.4.23", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@shardus/archiver", - "version": "3.4.21", + "version": "3.4.23", "license": "ISC", "dependencies": { "@fastify/cors": "^8.2.0", diff --git a/package.json b/package.json index a3bf835b..8450e652 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@shardus/archiver", - "version": "3.4.21", + "version": "3.4.23", "engines": { "node": "18.16.1" }, @@ -17,12 +17,11 @@ "scripts": { "start": "npm run prepare && node build/server.js", "release": "npm run prepare && np --no-cleanup --no-tests --no-yarn --any-branch", - "test": "echo \"Error: no test specified\" && exit 1", "check": "gts check", "clean": "npm-run-all clean:*", "clean:typescript": "gts clean", - "lint": "eslint './src/**/*.ts'", - "lint-windows": "eslint ./src/**/*.ts", + "lint": "eslint \"./src/**/*.ts\"", + "test": "echo \"Error: no test specified\" && exit 1", "format-check": "prettier --check './src/**/*.ts'", "format-fix": "prettier --write './src/**/*.ts'", "clean:artifacts": "shx rm -rf archiver-logs/ archiver-db/ data-logs/", diff --git a/scripts/archiver_data_sync_check.ts b/scripts/archiver_data_sync_check.ts index 0d5555a4..704bf35d 100644 --- a/scripts/archiver_data_sync_check.ts +++ b/scripts/archiver_data_sync_check.ts @@ -4,6 +4,7 @@ import { join } from 'path' import { postJson } from '../src/P2P' import { config, overrideDefaultConfig } from '../src/Config' import { ArchiverNodeInfo } from '../src/State' +import { Utils as StringUtils } from '@shardus/types' const configFile = join(process.cwd(), 'archiver-config.json') overrideDefaultConfig(configFile) diff --git a/scripts/create_shut_down_cycle.ts b/scripts/create_shut_down_cycle.ts index ed793ee6..cf575733 100644 --- a/scripts/create_shut_down_cycle.ts +++ b/scripts/create_shut_down_cycle.ts @@ -10,6 +10,7 @@ import * as Logger from '../src/Logger' import { P2P } from '@shardus/types' import { addSigListeners } from '../src/State' import { computeCycleMarker } from '../src/Data/Cycles' +import { Utils as StringUtils } from '@shardus/types' const archiversAtShutdown = [ { diff --git a/scripts/repair_missing_cycle.ts b/scripts/repair_missing_cycle.ts index 43188aea..a35b657d 100644 --- a/scripts/repair_missing_cycle.ts +++ b/scripts/repair_missing_cycle.ts @@ -1,6 +1,5 @@ import { readFileSync } from 'fs' -import { resolve } from 'path' -import { join } from 'path' +import { resolve, join } from 'path' import { overrideDefaultConfig, config } from '../src/Config' import * as Crypto from '../src/Crypto' import * as db from '../src/dbstore/sqlite3storage' @@ -8,6 +7,7 @@ import * as dbstore from '../src/dbstore' import * as CycleDB from '../src/dbstore/cycles' import { startSaving } from '../src/saveConsoleOutput' import * as Logger from '../src/Logger' +import { Utils as StringUtils } from '@shardus/types' const patchCycleData = false diff --git a/scripts/update_config.ts b/scripts/update_config.ts new file mode 100644 index 00000000..f78657a4 --- /dev/null +++ b/scripts/update_config.ts @@ -0,0 +1,50 @@ +import axios from 'axios' +import { join } from 'path' +import { Utils } from '@shardus/types' +import * as crypto from '@shardus/crypto-utils' +import { config, overrideDefaultConfig } from '../src/Config' + +const configFile = join(process.cwd(), 'archiver-config.json') +overrideDefaultConfig(configFile) + +crypto.init(config.ARCHIVER_HASH_KEY) + +const DEV_KEYS = { + pk: config.ARCHIVER_PUBLIC_KEY, + sk: config.ARCHIVER_SECRET_KEY, +} + +function sign(obj: T, sk: string, pk: string): T & any { + const objCopy = JSON.parse(crypto.stringify(obj)) + crypto.signObj(objCopy, sk, pk) + return objCopy +} + +function createSignature(data: any, pk: string, sk: string): any { + return sign({ ...data }, sk, pk) +} + +const UPDATE_CONFIG = { + /* Add Config properties that need to be updated here */ + VERBOSE: true, + RATE_LIMIT: 200, +} + +const INPUT = Utils.safeStringify(createSignature(UPDATE_CONFIG, DEV_KEYS.pk, DEV_KEYS.sk)) + +axios + .patch('http://127.0.0.1:4000/set-config', INPUT, { + headers: { + 'Content-Type': 'application/json', + }, + }) + .then((response) => { + console.log(response.data) + }) + .catch((error) => { + if (error.response) { + console.error(error.response) + } else { + console.error(error.message) + } + }) diff --git a/scripts/update_network_account.ts b/scripts/update_network_account.ts index b72c5974..90cf1671 100644 --- a/scripts/update_network_account.ts +++ b/scripts/update_network_account.ts @@ -9,6 +9,7 @@ import { startSaving } from '../src/saveConsoleOutput' import * as Logger from '../src/Logger' import { accountSpecificHash } from '../src/shardeum/calculateAccountHash' import { addSigListeners } from '../src/State' +import { Utils as StringUtils } from '@shardus/types' const activeVersion = '1.9.0' const latestVersion = '1.9.0' diff --git a/scripts/validate_archiver_receipt.ts b/scripts/validate_archiver_receipt.ts index a7682f02..1785fef3 100644 --- a/scripts/validate_archiver_receipt.ts +++ b/scripts/validate_archiver_receipt.ts @@ -3,7 +3,7 @@ import { overrideDefaultConfig, config } from '../src/Config' import * as Crypto from '../src/Crypto' import * as Utils from '../src/Utils' import * as Receipt from '../src/dbstore/receipts' -import { AccountType, accountSpecificHash, fixAccountUint8Arrays } from '../src/shardeum/calculateAccountHash' +import { AccountType, accountSpecificHash } from '../src/shardeum/calculateAccountHash' import { ShardeumReceipt } from '../src/shardeum/verifyAppReceiptData' // Add the full receipt data here @@ -243,16 +243,6 @@ export const verifyAccountHash = (receipt: Receipt.ArchiverReceipt): boolean => try { if (receipt.globalModification && config.skipGlobalTxReceiptVerification) return true // return true if global modification for (const account of receipt.accounts) { - if (account.data.accountType === AccountType.Account) { - fixAccountUint8Arrays(account.data.account) - // console.dir(acc, { depth: null }) - } else if ( - account.data.accountType === AccountType.ContractCode || - account.data.accountType === AccountType.ContractStorage - ) { - fixAccountUint8Arrays(account.data) - // console.dir(acc, { depth: null }) - } const calculatedAccountHash = accountSpecificHash(account.data) const indexOfAccount = receipt.appliedReceipt.appliedVote.account_id.indexOf(account.accountId) if (indexOfAccount === -1) { diff --git a/scripts/verify_account_hash.ts b/scripts/verify_account_hash.ts index 5eaf8686..e0be9b99 100644 --- a/scripts/verify_account_hash.ts +++ b/scripts/verify_account_hash.ts @@ -7,8 +7,9 @@ import * as dbstore from '../src/dbstore' import * as AccountDB from '../src/dbstore/accounts' import { startSaving } from '../src/saveConsoleOutput' import * as Logger from '../src/Logger' -import { AccountType, fixAccountUint8Arrays, accountSpecificHash } from '../src/shardeum/calculateAccountHash' +import { AccountType, accountSpecificHash } from '../src/shardeum/calculateAccountHash' import { addSigListeners } from '../src/State' +import { Utils as StringUtils } from '@shardus/types' const updateHash = false const runProgram = async (): Promise => { @@ -47,16 +48,6 @@ const runProgram = async (): Promise => { if (accountHash1 !== accountHash2) { console.log(account.accountId, 'accountHash', accountHash1, 'accountHash2', accountHash2) } - if (account.data.accountType === AccountType.Account) { - fixAccountUint8Arrays(account.data.account) - // console.dir(acc, { depth: null }) - } else if ( - account.data.accountType === AccountType.ContractCode || - account.data.accountType === AccountType.ContractStorage - ) { - fixAccountUint8Arrays(account.data) - // console.dir(acc, { depth: null }) - } const calculatedAccountHash = accountSpecificHash(account.data) if (accountHash1 !== calculatedAccountHash) { diff --git a/src/API.ts b/src/API.ts index 202f7997..5335754c 100644 --- a/src/API.ts +++ b/src/API.ts @@ -1,7 +1,7 @@ import { Signature } from '@shardus/crypto-utils' import { FastifyInstance, FastifyRequest } from 'fastify' import { Server, IncomingMessage, ServerResponse } from 'http' -import { config } from './Config' +import { config, updateConfig, Config as ConfigInterface } from './Config' import * as Crypto from './Crypto' import * as State from './State' import * as NodeList from './NodeList' @@ -400,6 +400,13 @@ export function registerRoutes(server: FastifyInstance MAX_ORIGINAL_TXS_PER_REQUEST) { + reply.send({ + success: false, + error: `Exceed maximum limit of ${MAX_ORIGINAL_TXS_PER_REQUEST} original transactions`, + }) + return + } for (const [txId, txTimestamp] of txIdList) { if (typeof txId !== 'string' || txId.length !== TXID_LENGTH || typeof txTimestamp !== 'number') { reply.send({ @@ -515,6 +522,13 @@ export function registerRoutes(server: FastifyInstance MAX_RECEIPTS_PER_REQUEST) { + reply.send({ + success: false, + error: `Exceed maximum limit of ${MAX_RECEIPTS_PER_REQUEST} receipts`, + }) + return + } for (const [txId, txTimestamp] of txIdList) { if (typeof txId !== 'string' || txId.length !== TXID_LENGTH || typeof txTimestamp !== 'number') { reply.send({ @@ -854,6 +868,9 @@ export function registerRoutes(server: FastifyInstance + type ConfigPatchRequest = FastifyRequest<{ + Body: Partial + }> server.post('/get_account_data_archiver', async (_request: AccountDataRequest, reply) => { const payload = _request.body as AccountDataProvider.AccountDataRequestSchema @@ -875,7 +892,8 @@ export function registerRoutes(server: FastifyInstance { const payload = _request.body as AccountDataProvider.AccountDataByListRequestSchema - if (config.VERBOSE) Logger.mainLogger.debug('Account Data By List received', StringUtils.safeStringify(payload)) + if (config.VERBOSE) + Logger.mainLogger.debug('Account Data By List received', StringUtils.safeStringify(payload)) const result = AccountDataProvider.validateAccountDataByListRequest(payload) // Logger.mainLogger.debug('Account Data By List validation result', result) if (!result.success) { @@ -893,7 +911,8 @@ export function registerRoutes(server: FastifyInstance { const payload = _request.body as AccountDataProvider.GlobalAccountReportRequestSchema - if (config.VERBOSE) Logger.mainLogger.debug('Global Account Report received', StringUtils.safeStringify(payload)) + if (config.VERBOSE) + Logger.mainLogger.debug('Global Account Report received', StringUtils.safeStringify(payload)) const result = AccountDataProvider.validateGlobalAccountReportRequest(payload) // Logger.mainLogger.debug('Global Account Report validation result', result) if (!result.success) { @@ -906,6 +925,47 @@ export function registerRoutes(server: FastifyInstance { + isDebugMiddleware(_request, reply) + }, + }, + async (_request: ConfigPatchRequest, reply) => { + const RESTRICTED_PARAMS = [ + 'ARCHIVER_IP', + 'ARCHIVER_PORT', + 'ARCHIVER_HASH_KEY', + 'ARCHIVER_SECRET_KEY', + 'ARCHIVER_PUBLIC_KEY', + ] + try { + const { sign, ...newConfig } = _request.body + const validKeys = new Set(Object.keys(config)) + const payloadKeys = Object.keys(newConfig) + const invalidKeys = payloadKeys.filter( + (key) => !validKeys.has(key) || RESTRICTED_PARAMS.includes(key) + ) + + if (invalidKeys.length > 0) + throw new Error(`Invalid/Unauthorised config properties provided: ${invalidKeys.join(', ')}`) + + if (config.VERBOSE) + Logger.mainLogger.debug('Archiver config update executed: ', JSON.stringify(newConfig)) + + const updatedConfig = updateConfig(newConfig) + reply.send({ success: true, ...updatedConfig, ARCHIVER_SECRET_KEY: '' }) + } catch (error) { + reply.status(400).send({ success: false, reason: error.message }) + } + } + ) + // Config Endpoint server.get( '/config', @@ -1201,11 +1261,8 @@ export const queryFromArchivers = async ( url = `/totalData` break } - const filteredArchivers = State.activeArchivers.filter( - (archiver) => archiver.publicKey !== config.ARCHIVER_PUBLIC_KEY - ) const maxNumberofArchiversToRetry = 3 - const randomArchivers = Utils.getRandomItemFromArr(filteredArchivers, 0, maxNumberofArchiversToRetry) + const randomArchivers = Utils.getRandomItemFromArr(State.otherArchivers, 0, maxNumberofArchiversToRetry) let retry = 0 while (retry < maxNumberofArchiversToRetry) { // eslint-disable-next-line security/detect-object-injection diff --git a/src/Config.ts b/src/Config.ts index 350330db..ce439aef 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -57,6 +57,24 @@ export interface Config { } newPOQReceipt: boolean storeReceiptBeforeStates: boolean + waitingTimeForMissingTxData: number // Wait time in ms for missing tx data before collecting from other archivers + gossipToMoreArchivers: true // To gossip to more archivers in addition to adjacent archivers + randomGossipArchiversCount: 2 // Number of random archivers to gossip to + subscribeToMoreConsensors: boolean // To subscribe to more consensors when the number of active archivers is less than 4 + extraConsensorsToSubscribe: 1 // Number of extra consensors to subscribe to + // For debugging gossip data, set this to true. This will save only the gossip data received from the gossip archivers. + saveOnlyGossipData: boolean + // For debugging purpose, set this to true to stop gossiping tx data + stopGossipTxData: boolean + usePOQo: boolean + // The percentage of votes required to confirm transaction + requiredVotesPercentage: number + // max number of recent cycle shard data to keep + maxCyclesShardDataToKeep: number + // the number of cycles within which we want to keep \changes to a config*/ + configChangeMaxCyclesToKeep: number + // the number of config changes to keep*/ + configChangeMaxChangesToKeep: number } let config: Config = { @@ -78,7 +96,7 @@ let config: Config = { save: true, interval: 1, }, - ARCHIVER_MODE: 'debug', // 'debug'/'release' + ARCHIVER_MODE: 'release', // 'debug'/'release' DevPublicKey: '', dataLogWrite: true, dataLogWriter: { @@ -112,6 +130,18 @@ let config: Config = { }, newPOQReceipt: false, storeReceiptBeforeStates: false, + waitingTimeForMissingTxData: 2000, // in ms + gossipToMoreArchivers: true, + randomGossipArchiversCount: 2, + subscribeToMoreConsensors: true, + extraConsensorsToSubscribe: 1, + saveOnlyGossipData: false, + stopGossipTxData: false, + usePOQo: true, + requiredVotesPercentage: 2 / 3, + maxCyclesShardDataToKeep: 10, + configChangeMaxCyclesToKeep: 5, + configChangeMaxChangesToKeep: 1000, } // Override default config params from config file, env vars, and cli args export async function overrideDefaultConfig(file: string): Promise { @@ -228,4 +258,18 @@ export async function overrideDefaultConfig(file: string): Promise { } } +export function updateConfig(newConfig: Partial): Config { + for (const key in newConfig) { + if (typeof newConfig[key] !== typeof config[key]) + throw new Error( + `Value with incorrect type passed to update the Archiver Config: ${key}:${ + newConfig[key] + } of type ${typeof newConfig[key]}` + ) + } + config = merge(config, newConfig) + Logger.mainLogger.info('Updated Archiver Config:', config) + return config +} + export { config } diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index e5007d3a..2ba2d489 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -19,14 +19,14 @@ import { getCurrentCycleCounter, shardValuesByCycle, computeCycleMarker } from ' import { bulkInsertCycles, queryCycleByMarker, updateCycle } from '../dbstore/cycles' import * as State from '../State' import * as Utils from '../Utils' -import { DataType, GossipData, adjacentArchivers, sendDataToAdjacentArchivers, TxData } from './GossipData' +import { DataType, GossipData, sendDataToAdjacentArchivers, TxData } from './GossipData' import { postJson } from '../P2P' import { globalAccountsMap, setGlobalNetworkAccount } from '../GlobalAccount' import { CycleLogWriter, ReceiptLogWriter, OriginalTxDataLogWriter } from '../Data/DataLogWriter' import * as OriginalTxDB from '../dbstore/originalTxsData' import ShardFunction from '../ShardFunctions' import { ConsensusNodeInfo } from '../NodeList' -import { verifyAccountHash } from '../shardeum/calculateAccountHash' +import { accountSpecificHash, verifyAccountHash } from '../shardeum/calculateAccountHash' import { verifyAppReceiptData } from '../shardeum/verifyAppReceiptData' import { Cycle as DbCycle } from '../dbstore/types' import { Utils as StringUtils } from '@shardus/types' @@ -44,13 +44,9 @@ const collectingMissingOriginalTxsMap: Map = new Map() interface MissingTx { txTimestamp: number receivedTimestamp: number + senders: string[] } -const WAIT_TIME_FOR_MISSING_TX_DATA = 2000 // in ms - -// For debugging gossip data, set this to true. This will save only the gossip data received from the adjacent archivers. -export const saveOnlyGossipData = false - type GET_TX_RECEIPT_RESPONSE = { success: boolean receipt?: Receipt.ArchiverReceipt | Receipt.AppliedReceipt2 @@ -389,12 +385,27 @@ export const verifyReceiptData = async ( receipt: Receipt.ArchiverReceipt, checkReceiptRobust = true ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { - if (config.newPOQReceipt === false) return { success: true } const result = { success: false } // Check the signed nodes are part of the execution group nodes of the tx const { executionShardKey, cycle, appliedReceipt, globalModification } = receipt if (globalModification && config.skipGlobalTxReceiptVerification) return { success: true } - const { appliedVote, confirmOrChallenge } = appliedReceipt + const { appliedVote, signatures } = appliedReceipt + const { txId, timestamp } = receipt.tx + if (config.VERBOSE) { + const currentTimestamp = Date.now() + // Console log the timetaken between the receipt timestamp and the current time ( both in ms and s) + console.log( + `Time taken between receipt timestamp and current time: ${txId}`, + `${currentTimestamp - timestamp} ms`, + `${(currentTimestamp - timestamp) / 1000} s` + ) + } + const currentCycle = getCurrentCycleCounter() + if (currentCycle - cycle > 2) { + Logger.mainLogger.error( + `Found receipt with cycle older than 2 cycles ${txId}, ${cycle}, ${timestamp}, ${currentCycle}` + ) + } const cycleShardData = shardValuesByCycle.get(cycle) if (!cycleShardData) { Logger.mainLogger.error('Cycle shard data not found') @@ -403,6 +414,83 @@ export const verifyReceiptData = async ( } // Determine the home partition index of the primary account (executionShardKey) const { homePartition } = ShardFunction.addressToPartition(cycleShardData.shardGlobals, executionShardKey) + if (config.newPOQReceipt === false) { + // Refer to https://github.com/shardeum/shardus-core/blob/f7000c36faa0cd1e0832aa1e5e3b1414d32dcf66/src/state-manager/TransactionConsensus.ts#L1406 + let votingGroupCount = cycleShardData.shardGlobals.nodesPerConsenusGroup + if (votingGroupCount > cycleShardData.nodes.length) { + votingGroupCount = cycleShardData.nodes.length + } + const requiredSignatures = + config.usePOQo === true + ? Math.ceil(votingGroupCount * config.requiredVotesPercentage) + : Math.round(votingGroupCount * config.requiredVotesPercentage) + if (signatures.length < requiredSignatures) { + Logger.mainLogger.error( + `Invalid receipt appliedReceipt signatures count is less than requiredSignatures, ${signatures.length}, ${requiredSignatures}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedReceipt_signatures_count_less_than_requiredSignatures' + ) + return result + } + // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2799 + const voteHash = calculateVoteHash(appliedVote) + // Refer to https://github.com/shardeum/shardus-core/blob/50b6d00f53a35996cd69210ea817bee068a893d6/src/state-manager/TransactionConsensus.ts#L2663 + const appliedVoteHash = { + txid: txId, + voteHash, + } + // Using a map to store the good signatures to avoid duplicates + const goodSignatures = new Map() + for (const signature of signatures) { + const { owner: nodePubKey } = signature + // Get the node id from the public key + const node = cycleShardData.nodes.find((node) => node.publicKey === nodePubKey) + if (node == null) { + Logger.mainLogger.error( + `The node with public key ${nodePubKey} of the receipt ${txId}} with ${timestamp} is not in the active nodesList of cycle ${cycle}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'appliedReceipt_signature_owner_not_in_active_nodesList' + ) + continue + } + // Check if the node is in the execution group + if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[node.id]) { + Logger.mainLogger.error( + `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the execution group of the tx` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'appliedReceipt_signature_node_not_in_execution_group_of_tx' + ) + continue + } + if (Crypto.verify({ ...appliedVoteHash, sign: signature })) { + goodSignatures.set(signature.owner, signature) + // Break the loop if the required number of good signatures are found + if (goodSignatures.size >= requiredSignatures) break + } + } + if (goodSignatures.size < requiredSignatures) { + Logger.mainLogger.error( + `Invalid receipt appliedReceipt valid signatures count is less than requiredSignatures ${goodSignatures.size}, ${requiredSignatures}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_appliedReceipt_valid_signatures_count_less_than_requiredSignatures' + ) + return result + } + return { success: true } + } + const { confirmOrChallenge } = appliedReceipt // Check if the appliedVote node is in the execution group if (!cycleShardData.nodeShardDataMap.has(appliedVote.node_id)) { Logger.mainLogger.error('Invalid receipt appliedReceipt appliedVote node is not in the active nodesList') @@ -510,6 +598,35 @@ export const verifyReceiptData = async ( return { success: true } } +const calculateVoteHash = (vote: Receipt.AppliedVote): string => { + try { + if (config.usePOQo === true) { + const appliedHash = { + applied: vote.transaction_result, + cantApply: vote.cant_apply, + } + const stateHash = { + account_id: vote.account_id, + account_state_hash_after: vote.account_state_hash_after, + account_state_hash_before: vote.account_state_hash_before, + } + const appDataHash = { + app_data_hash: vote.app_data_hash, + } + const voteToHash = { + appliedHash: Crypto.hashObj(appliedHash), + stateHash: Crypto.hashObj(stateHash), + appDataHash: Crypto.hashObj(appDataHash), + } + return Crypto.hashObj(voteToHash) + } + return Crypto.hashObj({ ...vote, node_id: '' }) + } catch { + Logger.mainLogger.error('Error in calculateVoteHash', vote) + return '' + } +} + export const storeReceiptData = async ( receipts: Receipt.ArchiverReceipt[], senderInfo = '', @@ -737,6 +854,8 @@ export const storeReceiptData = async ( } if (combineAccounts.length > 0) await Account.bulkInsertAccounts(combineAccounts) if (combineTransactions.length > 0) await Transaction.bulkInsertTransactions(combineTransactions) + // If the archiver is not active, good to clean up the processed receipts map if it exceeds 2000 + if (!State.isActive && processedReceiptsMap.size > 2000) processedReceiptsMap.clear() } export const validateCycleData = (cycleRecord: P2PTypes.CycleCreatorTypes.CycleData): boolean => { @@ -825,12 +944,11 @@ interface StoreAccountParam { } export const storeAccountData = async (restoreData: StoreAccountParam = {}): Promise => { - console.log( - 'RestoreData', - 'accounts', - restoreData.accounts ? restoreData.accounts.length : 0, - 'receipts', - restoreData.receipts ? restoreData.receipts.length : 0 + Logger.mainLogger.debug( + `storeAccountData: ${restoreData.accounts ? restoreData.accounts.length : 0} accounts` + ) + Logger.mainLogger.debug( + `storeAccountData: ${restoreData.receipts ? restoreData.receipts.length : 0} receipts` ) const { accounts, receipts } = restoreData if (profilerInstance) profilerInstance.profileSectionStart('store_account_data') @@ -851,7 +969,28 @@ export const storeAccountData = async (restoreData: StoreAccountParam = {}): Pro // // await Account.insertAccount(account) // // } // } - if (accounts && accounts.length > 0) await Account.bulkInsertAccounts(accounts) + // + if (accounts && accounts.length > 0) { + const combineAccounts = [] + for (const account of accounts) { + try { + const calculatedAccountHash = accountSpecificHash(account.data) + if (calculatedAccountHash !== account.hash) { + Logger.mainLogger.error( + 'Invalid account hash', + account.accountId, + account.hash, + calculatedAccountHash + ) + continue + } + combineAccounts.push(account) + } catch (error) { + Logger.mainLogger.error('Error in calculating genesis account hash', error) + } + } + if (combineAccounts.length > 0) await Account.bulkInsertAccounts(accounts) + } if (receipts && receipts.length > 0) { Logger.mainLogger.debug('Received receipts Size', receipts.length) const combineTransactions = [] @@ -869,10 +1008,9 @@ export const storeAccountData = async (restoreData: StoreAccountParam = {}): Pro await Transaction.bulkInsertTransactions(combineTransactions) } if (profilerInstance) profilerInstance.profileSectionEnd('store_account_data') - console.log('Combined Accounts Data', combineAccountsData.accounts.length) Logger.mainLogger.debug('Combined Accounts Data', combineAccountsData.accounts.length) if (combineAccountsData.accounts.length > 0 || combineAccountsData.receipts.length > 0) { - console.log('Found combine accountsData') + Logger.mainLogger.debug('Found combine accountsData', combineAccountsData.accounts.length) const accountData = { ...combineAccountsData } clearCombinedAccountsData() storeAccountData(accountData) @@ -927,6 +1065,8 @@ export const storeOriginalTxData = async ( await OriginalTxsData.bulkInsertOriginalTxsData(combineOriginalTxsData) if (State.isActive) sendDataToAdjacentArchivers(DataType.ORIGINAL_TX_DATA, txDataList) } + // If the archiver is not active yet, good to clean up the processed originalTxs map if it exceeds 2000 + if (!State.isActive && processedOriginalTxsMap.size > 2000) processedOriginalTxsMap.clear() } interface validateResponse { success: boolean @@ -961,7 +1101,6 @@ export const validateGossipData = (data: GossipData): validateResponse => { let err = Utils.validateTypes(data, { dataType: 's', data: 'a', - sender: 's', sign: 'o', }) if (err) { @@ -973,13 +1112,9 @@ export const validateGossipData = (data: GossipData): validateResponse => { Logger.mainLogger.error('Invalid gossip data signature', err) return { success: false, reason: 'Invalid gossip data signature' + err } } - if (data.sign.owner !== data.sender) { - Logger.mainLogger.error('Data sender publicKey and sign owner key does not match') - return { success: false, error: 'Data sender publicKey and sign owner key does not match' } - } - if (!adjacentArchivers.has(data.sender)) { - Logger.mainLogger.error('Data sender is not the adjacent archiver') - return { success: false, error: 'Data sender not the adjacent archiver' } + if (!State.activeArchivers.some((archiver) => archiver.publicKey === data.sign.owner)) { + Logger.mainLogger.error('Data sender is not the active archivers') + return { success: false, error: 'Data sender not the active archivers' } } if ( data.dataType !== DataType.RECEIPT && @@ -997,7 +1132,8 @@ export const validateGossipData = (data: GossipData): validateResponse => { } export const processGossipData = (gossipdata: GossipData): void => { - const { dataType, data, sender } = gossipdata + const { dataType, data, sign } = gossipdata + const senderArchiver = State.activeArchivers.find((archiver) => archiver.publicKey === sign.owner) const receivedTimestamp = Date.now() if (dataType === DataType.RECEIPT) { for (const { txId, timestamp } of data as TxData[]) { @@ -1006,10 +1142,30 @@ export const processGossipData = (gossipdata: GossipData): void => { (receiptsInValidationMap.has(txId) && receiptsInValidationMap.get(txId) === timestamp) || (collectingMissingReceiptsMap.has(txId) && collectingMissingReceiptsMap.get(txId) === timestamp) ) { - // console.log('GOSSIP', 'RECEIPT', 'SKIP', txId, sender) + // console.log('GOSSIP', 'RECEIPT', 'SKIP', txId, 'sender', sign.owner) continue - } else missingReceiptsMap.set(txId, { txTimestamp: timestamp, receivedTimestamp }) - // console.log('GOSSIP', 'RECEIPT', 'MISS', txId, sender) + } else { + if (missingReceiptsMap.has(txId)) { + if ( + missingReceiptsMap.get(txId).txTimestamp === timestamp && + !missingReceiptsMap.get(txId).senders.some((sender) => sender === sign.owner) + ) + missingReceiptsMap.get(txId).senders.push(sign.owner) + else { + // Not expected to happen, but log error if it happens <-- could be malicious act of the sender + if (missingReceiptsMap.get(txId).txTimestamp !== timestamp) + Logger.mainLogger.error( + `Received gossip for receipt ${txId} with different timestamp ${timestamp} from archiver ${sign.owner}` + ) + if (missingReceiptsMap.get(txId).senders.some((sender) => sender === sign.owner)) + Logger.mainLogger.error( + `Received gossip for receipt ${txId} from the same sender ${sign.owner}` + ) + } + } else + missingReceiptsMap.set(txId, { txTimestamp: timestamp, receivedTimestamp, senders: [sign.owner] }) + // console.log('GOSSIP', 'RECEIPT', 'MISS', txId, 'sender', sign.owner) + } } } if (dataType === DataType.ORIGINAL_TX_DATA) { @@ -1019,120 +1175,164 @@ export const processGossipData = (gossipdata: GossipData): void => { (originalTxsInValidationMap.has(txId) && originalTxsInValidationMap.get(txId) === timestamp) || (collectingMissingOriginalTxsMap.has(txId) && collectingMissingOriginalTxsMap.get(txId) === timestamp) ) { - // console.log('GOSSIP', 'ORIGINAL_TX_DATA', 'SKIP', txId, sender) + // console.log('GOSSIP', 'ORIGINAL_TX_DATA', 'SKIP', txId, 'sender', sign.owner) continue - } else missingOriginalTxsMap.set(txId, { txTimestamp: timestamp, receivedTimestamp }) - // console.log('GOSSIP', 'ORIGINAL_TX_DATA', 'MISS', txId, sender) + } else { + if (missingOriginalTxsMap.has(txId)) { + if ( + missingOriginalTxsMap.get(txId).txTimestamp === timestamp && + !missingOriginalTxsMap.get(txId).senders.some((sender) => sender === sign.owner) + ) + missingOriginalTxsMap.get(txId).senders.push(sign.owner) + else { + // Not expected to happen, but log error if it happens <-- could be malicious act of the sender + if (missingOriginalTxsMap.get(txId).txTimestamp !== timestamp) + Logger.mainLogger.error( + `Received gossip for originalTxData ${txId} with different timestamp ${timestamp} from archiver ${sign.owner}` + ) + if (missingOriginalTxsMap.get(txId).senders.some((sender) => sender === sign.owner)) + Logger.mainLogger.error( + `Received gossip for originalTxData ${txId} from the same sender ${sign.owner}` + ) + } + } else + missingOriginalTxsMap.set(txId, { + txTimestamp: timestamp, + receivedTimestamp, + senders: [sign.owner], + }) + // console.log('GOSSIP', 'ORIGINAL_TX_DATA', 'MISS', txId, 'sender', sign.owner) + } } } if (dataType === DataType.CYCLE) { collectCycleData( data as P2PTypes.CycleCreatorTypes.CycleData[], - adjacentArchivers.get(sender).ip + ':' + adjacentArchivers.get(sender).port + senderArchiver?.ip + ':' + senderArchiver?.port ) } } -export const collectMissingReceipts = async (): Promise => { - if (missingReceiptsMap.size === 0) return - const bucketSize = 100 +export const collectMissingTxDataFromArchivers = async (): Promise => { const currentTimestamp = Date.now() - const cloneMissingReceiptsMap: Map = new Map() - for (const [txId, { txTimestamp, receivedTimestamp }] of missingReceiptsMap) { - if (currentTimestamp - receivedTimestamp > WAIT_TIME_FOR_MISSING_TX_DATA) { - cloneMissingReceiptsMap.set(txId, txTimestamp) - collectingMissingReceiptsMap.set(txId, txTimestamp) - missingReceiptsMap.delete(txId) + if (missingReceiptsMap.size > 0) { + const cloneMissingReceiptsMap: Map> = new Map() + for (const [txId, { txTimestamp, receivedTimestamp, senders }] of missingReceiptsMap) { + if (currentTimestamp - receivedTimestamp > config.waitingTimeForMissingTxData) { + cloneMissingReceiptsMap.set(txId, { txTimestamp, senders }) + collectingMissingReceiptsMap.set(txId, txTimestamp) + missingReceiptsMap.delete(txId) + } } + if (cloneMissingReceiptsMap.size > 0) + Logger.mainLogger.debug('Collecting missing receipts', cloneMissingReceiptsMap.size) + for (const [txId, { txTimestamp, senders }] of cloneMissingReceiptsMap) { + collectMissingReceipts(senders, txId, txTimestamp) + } + cloneMissingReceiptsMap.clear() } - if (cloneMissingReceiptsMap.size === 0) return + if (missingOriginalTxsMap.size > 0) { + const cloneMissingOriginalTxsMap: Map> = new Map() + for (const [txId, { txTimestamp, receivedTimestamp, senders }] of missingOriginalTxsMap) { + if (currentTimestamp - receivedTimestamp > config.waitingTimeForMissingTxData) { + cloneMissingOriginalTxsMap.set(txId, { txTimestamp, senders }) + collectingMissingOriginalTxsMap.set(txId, txTimestamp) + missingOriginalTxsMap.delete(txId) + } + } + if (cloneMissingOriginalTxsMap.size > 0) + Logger.mainLogger.debug('Collecting missing originalTxsData', cloneMissingOriginalTxsMap.size) + for (const [txId, { txTimestamp, senders }] of cloneMissingOriginalTxsMap) { + collectMissingOriginalTxsData(senders, txId, txTimestamp) + } + cloneMissingOriginalTxsMap.clear() + } +} + +export const collectMissingReceipts = async ( + senders: string[], + txId: string, + txTimestamp: number +): Promise => { + const txIdList: [string, number][] = [[txId, txTimestamp]] + let foundTxData = false + const senderArchivers = State.activeArchivers.filter((archiver) => senders.includes(archiver.publicKey)) Logger.mainLogger.debug( - 'Collecting missing receipts', - cloneMissingReceiptsMap.size, - cloneMissingReceiptsMap + `Collecting missing receipt for txId ${txId} with timestamp ${txTimestamp} from archivers`, + senderArchivers.map((a) => a.ip + ':' + a.port) ) - // Try to get missing receipts from 3 different archivers if one archiver fails to return some receipts - const maxRetry = 3 - let retry = 0 - const archiversToUse: State.ArchiverNodeInfo[] = getArchiversToUse() - while (cloneMissingReceiptsMap.size > 0 && retry < maxRetry) { - // eslint-disable-next-line security/detect-object-injection - let archiver = archiversToUse[retry] - if (!archiver) archiver = archiversToUse[0] - const txIdList: [string, number][] = [] - let totalEntries = cloneMissingReceiptsMap.size - for (const [txId, txTimestamp] of cloneMissingReceiptsMap) { - totalEntries-- - if ( - (processedReceiptsMap.has(txId) && processedReceiptsMap.get(txId) === txTimestamp) || - (receiptsInValidationMap.has(txId) && receiptsInValidationMap.get(txId) === txTimestamp) - ) { - cloneMissingReceiptsMap.delete(txId) - collectingMissingReceiptsMap.delete(txId) - if (totalEntries !== 0) continue - } else txIdList.push([txId, txTimestamp]) - if (txIdList.length !== bucketSize && totalEntries !== 0) continue - if (txIdList.length === 0) continue - const receipts = (await queryTxDataFromArchivers( - archiver, - DataType.RECEIPT, - txIdList - )) as Receipt.Receipt[] - if (receipts && receipts.length > -1) { - const receiptsToSave = [] - for (const receipt of receipts) { - const { receiptId, timestamp } = receipt - if ( - cloneMissingReceiptsMap.has(receiptId) && - cloneMissingReceiptsMap.get(receiptId) === timestamp - ) { - cloneMissingReceiptsMap.delete(receiptId) - collectingMissingReceiptsMap.delete(txId) - receiptsToSave.push(receipt) - } + for (const senderArchiver of senderArchivers) { + if ( + (processedReceiptsMap.has(txId) && processedReceiptsMap.get(txId) === txTimestamp) || + (receiptsInValidationMap.has(txId) && receiptsInValidationMap.get(txId) === txTimestamp) + ) { + foundTxData = true + break + } + const receipts = (await queryTxDataFromArchivers( + senderArchiver, + DataType.RECEIPT, + txIdList + )) as Receipt.Receipt[] + if (receipts && receipts.length > 0) { + for (const receipt of receipts) { + const { receiptId, timestamp } = receipt + if (txId === receiptId && txTimestamp === timestamp) { + storeReceiptData([receipt], senderArchiver.ip + ':' + senderArchiver.port, true) + foundTxData = true } - await storeReceiptData(receiptsToSave, archiver.ip + ':' + archiver.port, true) } } - retry++ + if (foundTxData) break } - if (cloneMissingReceiptsMap.size > 0) { - Logger.mainLogger.debug( - 'Receipts TxId that are failed to get from other archivers', - cloneMissingReceiptsMap + if (!foundTxData) { + Logger.mainLogger.error( + `Failed to collect receipt for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` ) - // Clear the failed txIds from the collectingMissingReceiptsMap - for (const [txId] of cloneMissingReceiptsMap) { - collectingMissingReceiptsMap.delete(txId) - } } + collectingMissingOriginalTxsMap.delete(txId) } -export const getArchiversToUse = (): State.ArchiverNodeInfo[] => { - let archiversToUse: State.ArchiverNodeInfo[] = [] - const MAX_ARCHIVERS_TO_SELECT = 3 - // Choosing MAX_ARCHIVERS_TO_SELECT random archivers from the active archivers list - if (State.activeArchivers.length <= MAX_ARCHIVERS_TO_SELECT) { - State.activeArchivers.forEach( - (archiver) => archiver.publicKey !== State.getNodeInfo().publicKey && archiversToUse.push(archiver) - ) - } else { - // Filter out the adjacent archivers and self archiver from the active archivers list - const activeArchivers = [...State.activeArchivers].filter( - (archiver) => - adjacentArchivers.has(archiver.publicKey) || archiver.publicKey === State.getNodeInfo().publicKey - ) - archiversToUse = Utils.getRandomItemFromArr(activeArchivers, 0, MAX_ARCHIVERS_TO_SELECT) - if (archiversToUse.length < MAX_ARCHIVERS_TO_SELECT) { - const requiredArchivers = MAX_ARCHIVERS_TO_SELECT - archiversToUse.length - // If the required archivers are not selected, then get it from the adjacent archivers - archiversToUse = [ - ...archiversToUse, - ...Utils.getRandomItemFromArr([...adjacentArchivers.values()], requiredArchivers), - ] +const collectMissingOriginalTxsData = async ( + senders: string[], + txId: string, + txTimestamp: number +): Promise => { + const txIdList: [string, number][] = [[txId, txTimestamp]] + let foundTxData = false + const senderArchivers = State.activeArchivers.filter((archiver) => senders.includes(archiver.publicKey)) + Logger.mainLogger.debug( + `Collecting missing originalTxData for txId ${txId} with timestamp ${txTimestamp} from archivers`, + senderArchivers.map((a) => a.ip + ':' + a.port) + ) + for (const senderArchiver of senderArchivers) { + if ( + (processedOriginalTxsMap.has(txId) && processedOriginalTxsMap.get(txId) === txTimestamp) || + (originalTxsInValidationMap.has(txId) && originalTxsInValidationMap.get(txId) === txTimestamp) + ) { + foundTxData = true + break } + const originalTxs = (await queryTxDataFromArchivers( + senderArchiver, + DataType.ORIGINAL_TX_DATA, + txIdList + )) as OriginalTxDB.OriginalTxData[] + if (originalTxs && originalTxs.length > 0) { + for (const originalTx of originalTxs) + if (txId === originalTx.txId && txTimestamp === originalTx.timestamp) { + storeOriginalTxData([originalTx], senderArchiver.ip + ':' + senderArchiver.port) + foundTxData = true + } + } + if (foundTxData) break } - return archiversToUse + if (!foundTxData) { + Logger.mainLogger.error( + `Failed to collect originalTxData for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` + ) + } + collectingMissingReceiptsMap.delete(txId) } type TxDataFromArchiversResponse = { @@ -1174,78 +1374,6 @@ export const queryTxDataFromArchivers = async ( return null } -export const collectMissingOriginalTxsData = async (): Promise => { - if (missingOriginalTxsMap.size === 0) return - const bucketSize = 100 - const currentTimestamp = Date.now() - const cloneMissingOriginalTxsMap: Map = new Map() - for (const [txId, { txTimestamp, receivedTimestamp }] of missingOriginalTxsMap) { - if (currentTimestamp - receivedTimestamp > WAIT_TIME_FOR_MISSING_TX_DATA) { - cloneMissingOriginalTxsMap.set(txId, txTimestamp) - collectingMissingOriginalTxsMap.set(txId, txTimestamp) - missingOriginalTxsMap.delete(txId) - } - } - if (cloneMissingOriginalTxsMap.size === 0) return - Logger.mainLogger.debug( - 'Collecting missing originalTxsData', - cloneMissingOriginalTxsMap.size, - cloneMissingOriginalTxsMap - ) - // Try to get missing originalTxs from 3 different archivers if one archiver fails to return some receipts - const maxRetry = 3 - let retry = 0 - const archiversToUse: State.ArchiverNodeInfo[] = getArchiversToUse() - while (cloneMissingOriginalTxsMap.size > 0 && retry < maxRetry) { - // eslint-disable-next-line security/detect-object-injection - let archiver = archiversToUse[retry] - if (!archiver) archiver = archiversToUse[0] - const txIdList: [string, number][] = [] - let totalEntries = cloneMissingOriginalTxsMap.size - for (const [txId, txTimestamp] of cloneMissingOriginalTxsMap) { - totalEntries-- - if ( - (processedOriginalTxsMap.has(txId) && processedOriginalTxsMap.get(txId) === txTimestamp) || - (originalTxsInValidationMap.has(txId) && originalTxsInValidationMap.get(txId) === txTimestamp) - ) { - cloneMissingOriginalTxsMap.delete(txId) - collectingMissingOriginalTxsMap.delete(txId) - if (totalEntries !== 0) continue - } else txIdList.push([txId, txTimestamp]) - if (txIdList.length !== bucketSize && totalEntries !== 0) continue - if (txIdList.length === 0) continue - const originalTxs = (await queryTxDataFromArchivers( - archiver, - DataType.ORIGINAL_TX_DATA, - txIdList - )) as OriginalTxDB.OriginalTxData[] - if (originalTxs && originalTxs.length > -1) { - const originalTxsDataToSave = [] - for (const originalTx of originalTxs) { - const { txId, timestamp } = originalTx - if (cloneMissingOriginalTxsMap.has(txId) && cloneMissingOriginalTxsMap.get(txId) === timestamp) { - cloneMissingOriginalTxsMap.delete(txId) - collectingMissingOriginalTxsMap.delete(txId) - originalTxsDataToSave.push(originalTx) - } - } - await storeOriginalTxData(originalTxsDataToSave, archiver.ip + ':' + archiver.port) - } - } - retry++ - } - if (cloneMissingOriginalTxsMap.size > 0) { - Logger.mainLogger.debug( - 'OriginalTxsData TxId that are failed to get from other archivers', - cloneMissingOriginalTxsMap - ) - // Clear the failed txIds from the collectingMissingOriginalTxsMap - for (const [txId] of cloneMissingOriginalTxsMap) { - collectingMissingOriginalTxsMap.delete(txId) - } - } -} - export function cleanOldReceiptsMap(timestamp: number): void { let savedReceiptsCount = 0 for (const [key, value] of processedReceiptsMap) { @@ -1277,9 +1405,8 @@ export function cleanOldOriginalTxsMap(timestamp: number): void { } export const scheduleMissingTxsDataQuery = (): void => { - // Set to collect missing txs data in every 5 seconds + // Set to collect missing txs data in every 1 second setInterval(() => { - collectMissingReceipts() - collectMissingOriginalTxsData() + collectMissingTxDataFromArchivers() }, 1000) } diff --git a/src/Data/Cycles.ts b/src/Data/Cycles.ts index 005eb41f..d0e5a324 100644 --- a/src/Data/Cycles.ts +++ b/src/Data/Cycles.ts @@ -29,6 +29,7 @@ import { RequestDataType, queryFromArchivers } from '../API' import { stringifyReduce } from '../profiler/StringifyReduce' import { addCyclesToCache } from '../cache/cycleRecordsCache' import { queryLatestCycleRecords } from '../dbstore/cycles' +import { updateGlobalNetworkAccount } from '../GlobalAccount' export interface ArchiverCycleResponse { cycleInfo: P2PTypes.CycleCreatorTypes.CycleData[] @@ -52,8 +53,6 @@ export let cycleRecordWithShutDownMode = null as P2PTypes.CycleCreatorTypes.Cycl export let currentNetworkMode: P2PTypes.ModesTypes.Record['mode'] = 'forming' export const shardValuesByCycle = new Map() -const CYCLE_SHARD_STORAGE_LIMIT = 3 - export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData[]): Promise { if (profilerInstance) profilerInstance.profileSectionStart('process_cycle', false) try { @@ -85,6 +84,8 @@ export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData // Check the archivers reputaion in every new cycle & record the status recordArchiversReputation() } + await updateGlobalNetworkAccount(cycle.counter) + if (currentNetworkMode === 'shutdown') { Logger.mainLogger.debug(Date.now(), `❌ Shutdown Cycle Record received at Cycle #: ${cycle.counter}`) await Utils.sleep(currentCycleDuration) @@ -93,10 +94,11 @@ export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData setShutdownCycleRecord(cycle) NodeList.toggleFirstNode() } - // Clean receipts/originalTxs cache that are older than 5 minutes - const cleanupTimestamp = Date.now() - 5 * 60 * 1000 + // Clean receipts/originalTxs cache that are older than minutes + const cleanupTimestamp = (cycle.start - config.maxCyclesShardDataToKeep * 60) * 1000 cleanOldOriginalTxsMap(cleanupTimestamp) cleanOldReceiptsMap(cleanupTimestamp) + cleanShardCycleData(cycle.counter - config.maxCyclesShardDataToKeep) } } finally { if (profilerInstance) profilerInstance.profileSectionEnd('process_cycle', false) @@ -518,8 +520,13 @@ function updateShardValues(cycle: P2PTypes.CycleCreatorTypes.CycleData): void { const list = cycleShardData.nodes.map((n) => n['ip'] + ':' + n['port']) Logger.mainLogger.debug('cycleShardData', cycleShardData.cycleNumber, list.length, stringifyReduce(list)) shardValuesByCycle.set(cycleShardData.cycleNumber, cycleShardData) - if (shardValuesByCycle.size > CYCLE_SHARD_STORAGE_LIMIT) { - shardValuesByCycle.delete(shardValuesByCycle.keys().next().value) +} + +const cleanShardCycleData = (cycleNumber: number): void => { + for (const [key] of shardValuesByCycle) { + if (key < cycleNumber) { + shardValuesByCycle.delete(key) + } } } diff --git a/src/Data/Data.ts b/src/Data/Data.ts index f98500c3..d7a44150 100644 --- a/src/Data/Data.ts +++ b/src/Data/Data.ts @@ -17,7 +17,7 @@ import { ChangeSquasher, parse, totalNodeCount, activeNodeCount, applyNodeListCh import * as State from '../State' import * as P2P from '../P2P' import * as Utils from '../Utils' -import { config } from '../Config' +import { config, updateConfig } from '../Config' import { P2P as P2PTypes } from '@shardus/types' import * as Logger from '../Logger' import { nestedCountersInstance } from '../profiler/nestedCounters' @@ -27,7 +27,6 @@ import { storeAccountData, storingAccountData, storeOriginalTxData, - saveOnlyGossipData, } from './Collector' import * as CycleDB from '../dbstore/cycles' import * as ReceiptDB from '../dbstore/receipts' @@ -56,7 +55,6 @@ let subsetNodesMapByConsensusRadius: Map = const maxCyclesInCycleTracker = 5 const receivedCycleTracker = {} const QUERY_TIMEOUT_MAX = 30 // 30seconds - const { MAX_ACCOUNTS_PER_REQUEST, MAX_RECEIPTS_PER_REQUEST, @@ -65,6 +63,11 @@ const { MAX_BETWEEN_CYCLES_PER_REQUEST, } = config.REQUEST_LIMIT +const GENESIS_ACCOUNTS_CYCLE_RANGE = { + startCycle: 0, + endCycle: 5, +} + export enum DataRequestTypes { SUBSCRIBE = 'SUBSCRIBE', UNSUBSCRIBE = 'UNSUBSCRIBE', @@ -259,7 +262,7 @@ export function initSocketClient(node: NodeList.ConsensusNodeInfo): void { storeOriginalTxData( newData.responses.ORIGINAL_TX_DATA, sender.nodeInfo.ip + ':' + sender.nodeInfo.port, - saveOnlyGossipData + config.saveOnlyGossipData ) } if (newData.responses && newData.responses.RECEIPT) { @@ -275,33 +278,41 @@ export function initSocketClient(node: NodeList.ConsensusNodeInfo): void { newData.responses.RECEIPT, sender.nodeInfo.ip + ':' + sender.nodeInfo.port, true, - saveOnlyGossipData + config.saveOnlyGossipData ) } if (newData.responses && newData.responses.CYCLE) { collectCycleData(newData.responses.CYCLE, sender.nodeInfo.ip + ':' + sender.nodeInfo.port) } if (newData.responses && newData.responses.ACCOUNT) { - console.log( - 'RECEIVED ACCOUNTS DATA', - sender.nodeInfo.publicKey, - sender.nodeInfo.ip, - sender.nodeInfo.port - ) - Logger.mainLogger.debug( - 'RECEIVED ACCOUNTS DATA', - sender.nodeInfo.publicKey, - sender.nodeInfo.ip, - sender.nodeInfo.port - ) + if (getCurrentCycleCounter() > GENESIS_ACCOUNTS_CYCLE_RANGE.endCycle) { + Logger.mainLogger.error( + 'Account data is not meant to be received after the genesis accounts cycle range', + getCurrentCycleCounter() + ) + unsubscribeDataSender(sender.nodeInfo.publicKey) + return + } + if ( + Cycles.currentNetworkMode !== 'forming' || + NodeList.byPublicKey.size > 1 || + !NodeList.byPublicKey.has(sender.nodeInfo.publicKey) + ) { + Logger.mainLogger.error( + 'Account data is not meant to be received by the first validator', + `Number of nodes in the network ${NodeList.byPublicKey.size}` + ) + unsubscribeDataSender(sender.nodeInfo.publicKey) + return + } + Logger.mainLogger.debug(`RECEIVED ACCOUNTS DATA FROM ${sender.nodeInfo.ip}:${sender.nodeInfo.port}`) nestedCountersInstance.countEvent('genesis', 'accounts', 1) if (!forwardGenesisAccounts) { - console.log('Genesis Accounts To Sycn', newData.responses.ACCOUNT) Logger.mainLogger.debug('Genesis Accounts To Sycn', newData.responses.ACCOUNT) syncGenesisAccountsFromConsensor(newData.responses.ACCOUNT, sender.nodeInfo) } else { if (storingAccountData) { - console.log('Storing Data') + Logger.mainLogger.debug('Storing Account Data') let newCombineAccountsData = { ...combineAccountsData } if (newData.responses.ACCOUNT.accounts) newCombineAccountsData.accounts = [ @@ -323,17 +334,14 @@ export function initSocketClient(node: NodeList.ConsensusNodeInfo): void { } // Set new contactTimeout for sender. Postpone sender removal because data is still received from consensor - if (currentCycleDuration > 0) { - nestedCountersInstance.countEvent('archiver', 'postpone_contact_timeout') - // To make sure that the sender is still in the subscribed list - sender = dataSenders.get(newData.publicKey) - if (sender) - sender.contactTimeout = createContactTimeout( - sender.nodeInfo.publicKey, - 'This timeout is created after processing data' - ) - } - return + nestedCountersInstance.countEvent('archiver', 'postpone_contact_timeout') + // To make sure that the sender is still in the subscribed list + sender = dataSenders.get(newData.publicKey) + if (sender) + sender.contactTimeout = createContactTimeout( + sender.nodeInfo.publicKey, + 'This timeout is created after processing data' + ) } }) } @@ -479,25 +487,7 @@ export async function replaceDataSender(publicKey: NodeList.ConsensusNodeInfo['p ) return } - - // Check if there is any subscribed node from this subset - let foundSubscribedNodeFromThisSubset = false - for (const node of Object.values(subsetNodesList)) { - if (dataSenders.has(node.publicKey)) { - if (config.VERBOSE) Logger.mainLogger.debug('This node from the subset is in the subscribed list!') - if (foundSubscribedNodeFromThisSubset) { - // Unsubscribe the extra nodes from this subset - unsubscribeDataSender(node.publicKey) - } - foundSubscribedNodeFromThisSubset = true - } - } - - if (!foundSubscribedNodeFromThisSubset) { - Logger.mainLogger.debug('There is no subscribed node from this subset!') - // Pick a new dataSender from this subset - subscribeNodeFromThisSubset(subsetNodesList) - } + subscribeNodeFromThisSubset(subsetNodesList) } } } @@ -535,49 +525,119 @@ export function addDataSender(sender: DataSender): void { dataSenders.set(sender.nodeInfo.publicKey, sender) } +async function syncFromNetworkConfig(): Promise { + try { + // Define the query function to get the network config from a node + const queryFn = async (node): Promise => { + const REQUEST_NETCONFIG_TIMEOUT_SECOND = 3 // 3s timeout + try { + const response = await P2P.getJson( + `http://${node.ip}:${node.port}/netconfig`, + REQUEST_NETCONFIG_TIMEOUT_SECOND + ) + return response + } catch (error) { + Logger.mainLogger.error(`Error querying node ${node.ip}:${node.port}: ${error}`) + return null + } + } + // Define the equality function to compare two responses + const equalityFn = (responseA, responseB): boolean => { + return ( + responseA?.config?.sharding?.nodesPerConsensusGroup === + responseB?.config?.sharding?.nodesPerConsensusGroup + ) + } + // Get the list of 10 max random active nodes or the first node if no active nodes are available + const nodes = + NodeList.getActiveNodeCount() > 0 ? NodeList.getRandomActiveNodes(10) : [NodeList.getFirstNode()] + // Use robustQuery to get the consensusRadius from multiple nodes + const tallyItem = await robustQuery( + nodes, + queryFn, + equalityFn, + 3 // Redundancy (minimum 3 nodes should return the same result to reach consensus) + ) + if (tallyItem?.value?.config?.stateManager) { + // Updating the Archiver Config as per the latest Network Config + const { + useNewPOQ: newPOQReceipt, + configChangeMaxChangesToKeep, + configChangeMaxCyclesToKeep, + maxCyclesShardDataToKeep, + } = tallyItem.value.config.stateManager + // const devPublicKeys = tallyItem.value.config.debug.devPublicKeys + // const devPublicKey = + // devPublicKeys && + // Object.keys(devPublicKeys).length >= 3 && + // Object.keys(devPublicKeys).find((key) => devPublicKeys[key] === 3) + // if ( + // devPublicKey && + // typeof devPublicKey === typeof config.DevPublicKey && + // devPublicKey !== config.DevPublicKey + // ) + // updateConfig({ DevPublicKey: devPublicKey }) + if ( + !Utils.isUndefined(newPOQReceipt) && + typeof newPOQReceipt === typeof config.newPOQReceipt && + newPOQReceipt !== config.newPOQReceipt + ) + updateConfig({ newPOQReceipt }) + if ( + !Utils.isUndefined(configChangeMaxChangesToKeep) && + typeof configChangeMaxChangesToKeep === typeof config.configChangeMaxChangesToKeep && + configChangeMaxChangesToKeep !== config.configChangeMaxChangesToKeep + ) + updateConfig({ configChangeMaxChangesToKeep }) + if ( + !Utils.isUndefined(configChangeMaxCyclesToKeep) && + typeof configChangeMaxCyclesToKeep === typeof config.configChangeMaxCyclesToKeep && + configChangeMaxCyclesToKeep !== config.configChangeMaxCyclesToKeep + ) + updateConfig({ configChangeMaxCyclesToKeep }) + if ( + !Utils.isUndefined(maxCyclesShardDataToKeep) && + typeof maxCyclesShardDataToKeep === typeof config.maxCyclesShardDataToKeep && + maxCyclesShardDataToKeep !== config.maxCyclesShardDataToKeep + ) + updateConfig({ maxCyclesShardDataToKeep }) + return tallyItem + } + return null + } catch (error) { + Logger.mainLogger.error('❌ Error in syncFromNetworkConfig: ', error) + return null + } +} + async function getConsensusRadius(): Promise { // If there is no node, return existing currentConsensusRadius if (NodeList.isEmpty()) return currentConsensusRadius - // Define the query function to get the network config from a node - const queryFn = async (node): Promise => { - const REQUEST_NETCONFIG_TIMEOUT_SECOND = 2 // 2s timeout - try { - const response = await P2P.getJson( - `http://${node.ip}:${node.port}/netconfig`, - REQUEST_NETCONFIG_TIMEOUT_SECOND + const tallyItem = await syncFromNetworkConfig() + if (tallyItem?.value?.config) { + const nodesPerEdgeFromConfig = tallyItem.value.config.sharding?.nodesPerEdge + const nodesPerConsensusGroupFromConfig = tallyItem.value.config.sharding?.nodesPerConsensusGroup + + if (!Number.isInteger(nodesPerConsensusGroupFromConfig) || nodesPerConsensusGroupFromConfig <= 0) { + Logger.mainLogger.error( + 'nodesPerConsensusGroup is not a valid number:', + nodesPerConsensusGroupFromConfig ) - return response - } catch (error) { - Logger.mainLogger.error(`Error querying node ${node.ip}:${node.port}: ${error}`) - return null + return currentConsensusRadius } - } - // Define the equality function to compare two responses - const equalityFn = (responseA, responseB): boolean => { - return ( - responseA?.config?.sharding?.nodesPerConsensusGroup === - responseB?.config?.sharding?.nodesPerConsensusGroup + if (!Number.isInteger(nodesPerEdgeFromConfig) || nodesPerEdgeFromConfig <= 0) { + Logger.mainLogger.error('nodesPerEdge is not a valid number:', nodesPerEdgeFromConfig) + return currentConsensusRadius + } + if ( + nodesPerConsensusGroup === nodesPerConsensusGroupFromConfig && + nodesPerEdge === nodesPerEdgeFromConfig ) - } - - // Get the list of 10 max random active nodes or the first node if no active nodes are available - const nodes = - NodeList.getActiveNodeCount() > 0 ? NodeList.getRandomActiveNodes(10) : [NodeList.getFirstNode()] - - // Use robustQuery to get the consensusRadius from multiple nodes - const tallyItem = await robustQuery( - nodes, - queryFn, - equalityFn, - 3 // Redundancy (minimum 3 nodes should return the same result to reach consensus) - ) - - // Check if a consensus was reached - if (tallyItem && tallyItem.value && tallyItem.value.config) { - nodesPerConsensusGroup = tallyItem.value.config.sharding.nodesPerConsensusGroup - nodesPerEdge = tallyItem.value.config.sharding.nodesPerEdge + return currentConsensusRadius + nodesPerConsensusGroup = nodesPerConsensusGroupFromConfig + nodesPerEdge = nodesPerEdgeFromConfig // Upgrading consensus size to an odd number if (nodesPerConsensusGroup % 2 === 0) nodesPerConsensusGroup++ const consensusRadius = Math.floor((nodesPerConsensusGroup - 1) / 2) @@ -635,7 +695,11 @@ export async function createNodesGroupByConsensusRadius(): Promise { currentConsensusRadius = consensusRadius const activeList = [...NodeList.activeListByIdSorted] if (config.VERBOSE) Logger.mainLogger.debug('activeList', activeList.length, activeList) - const totalNumberOfNodesToSubscribe = Math.ceil(activeList.length / consensusRadius) + let totalNumberOfNodesToSubscribe = Math.ceil(activeList.length / consensusRadius) + // Only if there are less than 4 activeArchivers and if the consensusRadius is greater than 5 + if (config.subscribeToMoreConsensors && State.activeArchivers.length < 4 && currentConsensusRadius > 5) { + totalNumberOfNodesToSubscribe += totalNumberOfNodesToSubscribe * config.extraConsensorsToSubscribe + } Logger.mainLogger.debug('totalNumberOfNodesToSubscribe', totalNumberOfNodesToSubscribe) subsetNodesMapByConsensusRadius = new Map() let round = 0 @@ -652,45 +716,57 @@ export async function subscribeConsensorsByConsensusRadius(): Promise { await createNodesGroupByConsensusRadius() for (const [i, subsetList] of subsetNodesMapByConsensusRadius) { if (config.VERBOSE) Logger.mainLogger.debug('Round', i, 'subsetList', subsetList, dataSenders.keys()) - let foundSubscribedNodeFromThisSubset = false - for (const node of Object.values(subsetList)) { - if (dataSenders.has(node.publicKey)) { - if (config.VERBOSE) Logger.mainLogger.debug('This node from the subset is in the subscribed list!') - if (foundSubscribedNodeFromThisSubset) { - // Unsubscribe the extra nodes from this subset - unsubscribeDataSender(node.publicKey) - } - foundSubscribedNodeFromThisSubset = true - } - } - - if (!foundSubscribedNodeFromThisSubset) { - Logger.mainLogger.debug('There is no subscribed node from this subset!') - // Pick a new dataSender from this subset - subscribeNodeFromThisSubset(subsetList) - } + subscribeNodeFromThisSubset(subsetList) } } export async function subscribeNodeFromThisSubset(nodeList: NodeList.ConsensusNodeInfo[]): Promise { + // First check if there is any subscribed node from this subset + const subscribedNodesFromThisSubset = [] + for (const node of nodeList) { + if (dataSenders.has(node.publicKey)) { + if (config.VERBOSE) + Logger.mainLogger.debug('This node from the subset is in the subscribed list!', node.publicKey) + subscribedNodesFromThisSubset.push(node.publicKey) + } + } + let numberOfNodesToSubsribe = 1 + // Only if there are less than 4 activeArchivers and if the consensusRadius is greater than 5 + if (config.subscribeToMoreConsensors && State.activeArchivers.length < 4 && currentConsensusRadius > 5) { + numberOfNodesToSubsribe += config.extraConsensorsToSubscribe + } + if (subscribedNodesFromThisSubset.length > numberOfNodesToSubsribe) { + // If there is more than one subscribed node from this subset, unsubscribe the extra ones + for (const publicKey of subscribedNodesFromThisSubset.splice(numberOfNodesToSubsribe)) { + Logger.mainLogger.debug('Unsubscribing extra node from this subset', publicKey) + unsubscribeDataSender(publicKey) + } + } + if (config.VERBOSE) + Logger.mainLogger.debug('Subscribed nodes from this subset', subscribedNodesFromThisSubset) + if (subscribedNodesFromThisSubset.length === numberOfNodesToSubsribe) return + Logger.mainLogger.debug('Subscribing node from this subset!') + // Pick a new dataSender from this subset let subsetList = [...nodeList] // Pick a random dataSender let newSenderInfo = nodeList[Math.floor(Math.random() * nodeList.length)] let connectionStatus = false let retry = 0 - const MAX_RETRY_SUBSCRIPTION = 3 - while (retry < MAX_RETRY_SUBSCRIPTION) { + const MAX_RETRY_SUBSCRIPTION = 3 * numberOfNodesToSubsribe + while (retry < MAX_RETRY_SUBSCRIPTION && subscribedNodesFromThisSubset.length < numberOfNodesToSubsribe) { if (!dataSenders.has(newSenderInfo.publicKey)) { connectionStatus = await createDataTransferConnection(newSenderInfo) if (connectionStatus) { - break - } else { - subsetList = subsetList.filter((node) => node.publicKey !== newSenderInfo.publicKey) + // Check if the newSender is in the subscribed nodes of this subset + if (!subscribedNodesFromThisSubset.includes(newSenderInfo.publicKey)) + subscribedNodesFromThisSubset.push(newSenderInfo.publicKey) } } else { - // This means there is already a subscribed node from this subset - break + // Add the newSender to the subscribed nodes of this subset + if (!subscribedNodesFromThisSubset.includes(newSenderInfo.publicKey)) + subscribedNodesFromThisSubset.push(newSenderInfo.publicKey) } + subsetList = subsetList.filter((node) => node.publicKey !== newSenderInfo.publicKey) if (subsetList.length > 0) { newSenderInfo = subsetList[Math.floor(Math.random() * subsetList.length)] } else { @@ -974,7 +1050,7 @@ export async function syncGenesisAccountsFromArchiver(): Promise { // } const res = (await queryFromArchivers( RequestDataType.ACCOUNT, - { startCycle: 0, endCycle: 5 }, + { startCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.startCycle, endCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.endCycle }, QUERY_TIMEOUT_MAX )) as ArchiverAccountResponse if (res && (res.totalAccounts || res.totalAccounts === 0)) { @@ -991,8 +1067,8 @@ export async function syncGenesisAccountsFromArchiver(): Promise { const response = (await queryFromArchivers( RequestDataType.ACCOUNT, { - startCycle: 0, - endCycle: 5, + startCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.startCycle, + endCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.endCycle, page, }, QUERY_TIMEOUT_MAX @@ -1024,8 +1100,8 @@ export async function syncGenesisTransactionsFromArchiver(): Promise { const res = (await queryFromArchivers( RequestDataType.TRANSACTION, { - startCycle: 0, - endCycle: 5, + startCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.startCycle, + endCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.endCycle, }, QUERY_TIMEOUT_MAX )) as ArchiverTransactionResponse @@ -1043,8 +1119,8 @@ export async function syncGenesisTransactionsFromArchiver(): Promise { const response = (await queryFromArchivers( RequestDataType.TRANSACTION, { - startCycle: 0, - endCycle: 5, + startCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.startCycle, + endCycle: GENESIS_ACCOUNTS_CYCLE_RANGE.endCycle, page, }, QUERY_TIMEOUT_MAX diff --git a/src/Data/GossipData.ts b/src/Data/GossipData.ts index 461dd0a7..d2414e6d 100644 --- a/src/Data/GossipData.ts +++ b/src/Data/GossipData.ts @@ -4,9 +4,11 @@ import * as Crypto from '../Crypto' import { postJson } from '../P2P' import { Signature } from '@shardus/crypto-utils' import { P2P as P2PTypes } from '@shardus/types' +import * as Utils from '../Utils' +import { config } from '../Config' // adjacentArchivers are one archiver from left and one archiver from right of the current archiver -export let adjacentArchivers: Map = new Map() +export let adjacentArchivers: State.ArchiverNodeInfo[] = [] export enum DataType { RECEIPT = 'RECEIPT', @@ -19,16 +21,15 @@ export type TxData = { txId: string; timestamp: number } export interface GossipData { dataType: DataType data: TxData[] | P2PTypes.CycleCreatorTypes.CycleData[] - sender: string sign: Signature } -// For debugging purpose, set this to true to stop gossiping tx data -const stopGossipTxData = false +// List of archivers that are not adjacent to the current archiver +const remainingArchivers = [] export const getAdjacentLeftAndRightArchivers = (): void => { if (State.activeArchivers.length <= 1) { - adjacentArchivers = new Map() + adjacentArchivers = [] return } // Treat the archivers list as a circular list and get one left and one right archivers of the current archiver @@ -50,31 +51,44 @@ export const getAdjacentLeftAndRightArchivers = (): void => { rightArchiver = State.activeArchiversByPublicKeySorted[rightArchiverIndex] /* eslint-enable security/detect-object-injection */ } - adjacentArchivers = new Map() - if (leftArchiver) adjacentArchivers.set(leftArchiver.publicKey, leftArchiver) - if (rightArchiver) adjacentArchivers.set(rightArchiver.publicKey, rightArchiver) + adjacentArchivers.length = 0 + if (leftArchiver) adjacentArchivers.push(leftArchiver) + if (rightArchiver) adjacentArchivers.push(rightArchiver) + remainingArchivers.length = 0 + for (const archiver of State.otherArchivers) { + if (!adjacentArchivers.some((a) => a.publicKey === archiver.publicKey)) { + remainingArchivers.push(archiver) + } + } } export async function sendDataToAdjacentArchivers( dataType: DataType, data: GossipData['data'] ): Promise { - if (stopGossipTxData) return - if (adjacentArchivers.size === 0) return + if (config.stopGossipTxData) return + if (State.otherArchivers.length === 0) return const gossipPayload = { dataType, data, - sender: State.getNodeInfo().publicKey, } as GossipData const signedDataToSend = Crypto.sign(gossipPayload) try { - Logger.mainLogger.debug( - `Sending ${dataType} data to the archivers: ${Array.from(adjacentArchivers.values()).map( - (n) => `${n.ip}:${n.port}` - )}` - ) const promises = [] - for (const [, archiver] of adjacentArchivers) { + const archiversToSend = [...adjacentArchivers] + if (config.gossipToMoreArchivers && remainingArchivers.length > 0) { + const randomArchivers = Utils.getRandomItemFromArr( + remainingArchivers, + 0, + config.randomGossipArchiversCount + ) + if (randomArchivers.length > 0) archiversToSend.push(...randomArchivers) + } + if (config.VERBOSE) + Logger.mainLogger.debug( + `Sending ${dataType} data to the archivers: ${archiversToSend.map((n) => `${n.ip}:${n.port}`)}` + ) + for (const archiver of archiversToSend) { const url = `http://${archiver.ip}:${archiver.port}/gossip-data` try { const GOSSIP_DATA_TIMEOUT_SECOND = 10 // 10 seconds @@ -84,13 +98,13 @@ export async function sendDataToAdjacentArchivers( }) promises.push(promise) } catch (e) { - Logger.mainLogger.error('Error', e) + Logger.mainLogger.error(`Gossip Error to archiver ${archiver.ip}: ${archiver.port}`, e) } } try { - await Promise.all(promises) + await Promise.allSettled(promises) } catch (err) { - Logger.mainLogger.error('Network: ' + err) + Logger.mainLogger.error('Gossip Error: ' + err) } } catch (ex) { Logger.mainLogger.debug(ex) diff --git a/src/GlobalAccount.ts b/src/GlobalAccount.ts index 376fc8da..663cf920 100644 --- a/src/GlobalAccount.ts +++ b/src/GlobalAccount.ts @@ -7,8 +7,9 @@ import { config } from './Config' import { postJson, getJson } from './P2P' import { robustQuery, deepCopy } from './Utils' import { isDeepStrictEqual } from 'util' +import { accountSpecificHash } from './shardeum/calculateAccountHash' -let cachedGlobalNetworkAccount: object +let cachedGlobalNetworkAccount: AccountDB.AccountCopy let cachedGlobalNetworkAccountHash: string interface Node { @@ -21,6 +22,7 @@ export interface GlobalAccountsHashAndTimestamp { timestamp: number } export const globalAccountsMap = new Map() +const appliedConfigChanges = new Set() export function getGlobalNetworkAccount(hash: boolean): object | string { if (hash) { @@ -35,6 +37,145 @@ export function setGlobalNetworkAccount(account: AccountDB.AccountCopy): void { cachedGlobalNetworkAccountHash = account.hash } +interface NetworkConfigChanges { + cycle: number + change: any + appData: any +} + +export const updateGlobalNetworkAccount = async (cycleNumber: number): Promise => { + if (!cachedGlobalNetworkAccountHash) return + const networkAccount = rfdc()(cachedGlobalNetworkAccount) + const changes = networkAccount.data.listOfChanges as NetworkConfigChanges[] + if (!changes || !Array.isArray(changes)) { + return + } + const activeConfigChanges = new Set() + for (const change of changes) { + // skip future changes + if (change.cycle > cycleNumber) { + continue + } + const changeHash = Crypto.hashObj(change) + // skip handled changes + if (appliedConfigChanges.has(changeHash)) { + activeConfigChanges.add(changeHash) + continue + } + // apply this change + appliedConfigChanges.add(changeHash) + activeConfigChanges.add(changeHash) + const changeObj = change.change + const appData = change.appData + + // If there is initShutdown change, if the latest cycle is greater than the cycle of the change, then skip it + if (changeObj['p2p'] && changeObj['p2p']['initShutdown'] && change.cycle !== cycleNumber) continue + + const newChanges = pruneNetworkChangeQueue(changes, cycleNumber) + networkAccount.data.listOfChanges = newChanges + // https://github.com/shardeum/shardeum/blob/c449ecd21391747c5b7173da3a74415da2acb0be/src/index.ts#L6958 + // Increase the timestamp by 1 second + // networkAccount.data.timestamp += 1000 + + if (appData) { + updateNetworkChangeQueue(networkAccount.data, appData) + console.dir(networkAccount.data, { depth: null }) + // https://github.com/shardeum/shardeum/blob/c449ecd21391747c5b7173da3a74415da2acb0be/src/index.ts#L6889 + // Increase the timestamp by 1 second + // networkAccount.data.timestamp += 1000 + } + + networkAccount.hash = accountSpecificHash(networkAccount.data) + networkAccount.timestamp = networkAccount.data.timestamp + Logger.mainLogger.debug('updateGlobalNetworkAccount', networkAccount) + await AccountDB.updateAccount(networkAccount) + setGlobalNetworkAccount(networkAccount) + } + if (activeConfigChanges.size > 0) { + // clear the entries from appliedConfigChanges that are no longer in the changes list + for (const changeHash of appliedConfigChanges) { + if (!activeConfigChanges.has(changeHash)) { + appliedConfigChanges.delete(changeHash) + } + } + } +} + +const generatePathKeys = (obj: any, prefix = ''): string[] => { + /* eslint-disable security/detect-object-injection */ + let paths: string[] = [] + + // Loop over each key in the object + for (const key of Object.keys(obj)) { + // If the value corresponding to this key is an object (and not an array or null), + // then recurse into it. + if (obj[key] !== null && typeof obj[key] === 'object' && !Array.isArray(obj[key])) { + paths = paths.concat(generatePathKeys(obj[key], prefix + key + '.')) + } else { + // Otherwise, just append this key to the path. + paths.push(prefix + key) + } + } + return paths + /* eslint-enable security/detect-object-injection */ +} + +const pruneNetworkChangeQueue = ( + changes: NetworkConfigChanges[], + currentCycle: number +): NetworkConfigChanges[] => { + const configsMap = new Map() + const keepAliveCount = config.configChangeMaxChangesToKeep + for (let i = changes.length - 1; i >= 0; i--) { + const thisChange = changes[i] + let keepAlive = false + + let appConfigs = [] + if (thisChange.appData) { + appConfigs = generatePathKeys(thisChange.appData, 'appdata.') + } + const shardusConfigs: string[] = generatePathKeys(thisChange.change) + + const allConfigs = appConfigs.concat(shardusConfigs) + + for (const config of allConfigs) { + if (!configsMap.has(config)) { + configsMap.set(config, 1) + keepAlive = true + } else if (configsMap.get(config) < keepAliveCount) { + configsMap.set(config, configsMap.get(config) + 1) + keepAlive = true + } + } + + if (currentCycle - thisChange.cycle <= config.configChangeMaxCyclesToKeep) { + keepAlive = true + } + + if (keepAlive == false) { + changes.splice(i, 1) + } + } + return changes +} + +const updateNetworkChangeQueue = (data: object, appData: object): void => { + if ('current' in data) patchAndUpdate(data?.current, appData) +} + +const patchAndUpdate = (existingObject: any, changeObj: any, parentPath = ''): void => { + /* eslint-disable security/detect-object-injection */ + for (const [key, value] of Object.entries(changeObj)) { + if (existingObject[key] != null) { + if (typeof value === 'object') { + patchAndUpdate(existingObject[key], value, parentPath === '' ? key : parentPath + '.' + key) + } else { + existingObject[key] = value + } + } + } +} + export const loadGlobalAccounts = async (): Promise => { const sql = `SELECT * FROM accounts WHERE isGlobal=1` const values = [] @@ -48,9 +189,6 @@ export const loadGlobalAccounts = async (): Promise => { } export const syncGlobalAccount = async (retry = 5): Promise => { - const filteredArchivers = State.activeArchivers.filter( - (archiver) => archiver.publicKey !== config.ARCHIVER_PUBLIC_KEY - ) while (retry > 0) { try { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -71,7 +209,7 @@ export const syncGlobalAccount = async (retry = 5): Promise => { return equivalent } - const globalAccsResponse = await robustQuery(filteredArchivers, queryFn, equalFn, 3, true) + const globalAccsResponse = await robustQuery(State.otherArchivers, queryFn, equalFn, 3, true) Logger.mainLogger.debug('syncGlobalAccount() - globalAccsResponse', globalAccsResponse) if (!globalAccsResponse) { Logger.mainLogger.warn('() - robustResponse is null') @@ -97,7 +235,7 @@ export const syncGlobalAccount = async (retry = 5): Promise => { const queryFn = async (node: Node): Promise => { return await getJson(`http://${node.ip}:${node.port}/get-network-account?hash=false`) } - const networkAccResponse = await robustQuery(filteredArchivers, queryFn, equalFn, 3, true) + const networkAccResponse = await robustQuery(State.otherArchivers, queryFn, equalFn, 3, true) Logger.mainLogger.debug('syncGlobalAccount() - networkAccResponse', networkAccResponse) if (!networkAccResponse) { Logger.mainLogger.warn('get-network-account() - robustResponse is null') diff --git a/src/ShardFunctions.ts b/src/ShardFunctions.ts index 616b15d7..d0d62564 100644 --- a/src/ShardFunctions.ts +++ b/src/ShardFunctions.ts @@ -52,7 +52,7 @@ class ShardFunctions { //make sure nodesPerConsenusGroup is an odd number >= 3 if (nodesPerConsenusGroup % 2 === 0 || nodesPerConsenusGroup < 3) { - throw new Error(`nodesPerConsenusGroup:${nodesPerConsenusGroup} must be odd and >= 3`) + throw new Error(`nodesPerConsenusGroup: ${nodesPerConsenusGroup} must be odd and >= 3`) } shardGlobals.consensusRadius = Math.floor((nodesPerConsenusGroup - 1) / 2) diff --git a/src/State.ts b/src/State.ts index a98cc078..8423ff16 100644 --- a/src/State.ts +++ b/src/State.ts @@ -34,6 +34,8 @@ const nodeState: ArchiverNodeState = { export const joinedArchivers: ArchiverNodeInfo[] = [] // Add joined archivers to this list first and move to activeArchivers when they are active export let activeArchivers: ArchiverNodeInfo[] = [] export let activeArchiversByPublicKeySorted: ArchiverNodeInfo[] = [] +// archivers list without the current archiver +export let otherArchivers: ArchiverNodeInfo[] = [] export let isFirst = false export let isActive = false export const archiversReputation: Map = new Map() @@ -163,6 +165,9 @@ export function addArchiver(archiver: ArchiverNodeInfo): void { 'activeArchiversByPublicKeySorted', activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey) ) + if (archiver.publicKey !== config.ARCHIVER_PUBLIC_KEY) { + otherArchivers.push(archiver) + } Logger.mainLogger.debug('New archiver added to active list', archiver) } Logger.mainLogger.debug('archivers list', activeArchivers) @@ -173,6 +178,7 @@ export function removeActiveArchiver(publicKey: string): void { activeArchiversByPublicKeySorted = activeArchiversByPublicKeySorted.filter( (a: ArchiverNodeInfo) => a.publicKey !== publicKey ) + otherArchivers = otherArchivers.filter((a: ArchiverNodeInfo) => a.publicKey !== publicKey) archiversReputation.delete(publicKey) } @@ -180,6 +186,7 @@ export function resetActiveArchivers(archivers: ArchiverNodeInfo[]): void { Logger.mainLogger.debug('Resetting active archivers.', archivers) activeArchivers = archivers activeArchiversByPublicKeySorted = [...archivers.sort(NodeList.byAscendingPublicKey)] + otherArchivers = activeArchivers.filter((a) => a.publicKey !== config.ARCHIVER_PUBLIC_KEY) archiversReputation.clear() for (const archiver of activeArchivers) { archiversReputation.set(archiver.publicKey, 'up') @@ -252,9 +259,6 @@ export function setActive(): void { } export function getRandomArchiver(): ArchiverNodeInfo { - const filteredArchivers = activeArchivers.filter( - (archiver) => archiver.publicKey !== config.ARCHIVER_PUBLIC_KEY - ) - const randomArchiver = Utils.getRandomItemFromArr(filteredArchivers)[0] + const randomArchiver = Utils.getRandomItemFromArr(otherArchivers)[0] return randomArchiver } diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index 9d5f8977..9870e439 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -15,7 +15,7 @@ export const initializeDB = async (config: Config): Promise => { await db.runCreate( 'CREATE TABLE if not exists `cycles` (`cycleMarker` TEXT NOT NULL UNIQUE PRIMARY KEY, `counter` NUMBER NOT NULL, `cycleRecord` JSON NOT NULL)' ) - await db.runCreate('CREATE INDEX if not exists `cycles_idx` ON `cycles` (`counter` DESC)') + await db.runCreate('CREATE INDEX if not exists `cycles_idx` ON `cycles` (`counter` ASC)') await db.runCreate( 'CREATE TABLE if not exists `accounts` (`accountId` TEXT NOT NULL UNIQUE PRIMARY KEY, `data` JSON NOT NULL, `timestamp` BIGINT NOT NULL, `hash` TEXT NOT NULL, `cycleNumber` NUMBER NOT NULL, `isGlobal` BOOLEAN NOT NULL)' ) @@ -29,7 +29,6 @@ export const initializeDB = async (config: Config): Promise => { await db.runCreate( 'CREATE TABLE if not exists `originalTxsData` (`txId` TEXT NOT NULL, `timestamp` BIGINT NOT NULL, `cycle` NUMBER NOT NULL, `originalTxData` JSON NOT NULL, PRIMARY KEY (`txId`, `timestamp`))' ) - // await db.runCreate('Drop INDEX if exists `originalTxData_idx`'); await db.runCreate( 'CREATE INDEX if not exists `originalTxsData_idx` ON `originalTxsData` (`cycle` ASC, `timestamp` ASC)' ) diff --git a/src/dbstore/sqlite3storage.ts b/src/dbstore/sqlite3storage.ts index c5302e4b..667ff841 100644 --- a/src/dbstore/sqlite3storage.ts +++ b/src/dbstore/sqlite3storage.ts @@ -22,6 +22,13 @@ export async function init(config: Config): Promise { } }) await run('PRAGMA journal_mode=WAL') + db.on('profile', (sql, time) => { + if (time > 500 && time < 1000) { + console.log('SLOW QUERY', sql, time) + } else if (time > 1000) { + console.log('VERY SLOW QUERY', sql, time) + } + }) console.log('Database initialized.') } diff --git a/src/routes/healthCheck.ts b/src/routes/healthCheck.ts new file mode 100644 index 00000000..69ab6e2f --- /dev/null +++ b/src/routes/healthCheck.ts @@ -0,0 +1,14 @@ +import { FastifyPluginCallback } from 'fastify' + +export const healthCheckRouter: FastifyPluginCallback = function (fastify, opts, done) { + fastify.get('/is-alive', (req, res) => { + return res.status(200).send('OK') + }) + + fastify.get('/is-healthy', (req, res) => { + // TODO: Add actual health check logic + return res.status(200).send('OK') + }) + + done() +} \ No newline at end of file diff --git a/src/server.ts b/src/server.ts index 80f81529..f05463b7 100644 --- a/src/server.ts +++ b/src/server.ts @@ -40,6 +40,7 @@ import { loadGlobalAccounts, syncGlobalAccount } from './GlobalAccount' import { setShutdownCycleRecord, cycleRecordWithShutDownMode } from './Data/Cycles' import { registerRoutes } from './API' import { Utils as StringUtils } from '@shardus/types' +import { healthCheckRouter } from './routes/healthCheck' const configFile = join(process.cwd(), 'archiver-config.json') let logDir: string @@ -438,6 +439,7 @@ async function startServer(): Promise { timeWindow: 10, allowList: ['127.0.0.1', '0.0.0.0'], // Excludes local IPs from rate limits }) + await server.register(healthCheckRouter) server.addContentTypeParser('application/json', { parseAs: 'string' }, (req, body, done) => { try { diff --git a/src/shardeum/calculateAccountHash.ts b/src/shardeum/calculateAccountHash.ts index 61e6853e..326d08f9 100644 --- a/src/shardeum/calculateAccountHash.ts +++ b/src/shardeum/calculateAccountHash.ts @@ -57,17 +57,6 @@ export const accountSpecificHash = (account: any): string => { return hash } -// Converting the correct account data format to get the correct hash -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export const fixAccountUint8Arrays = (account: any): void => { - if (!account) return // if account is null, return - if (account.storageRoot) account.storageRoot = Uint8Array.from(Object.values(account.storageRoot)) // Account - if (account.codeHash) account.codeHash = Uint8Array.from(Object.values(account.codeHash)) // - //Account and ContractCode - if (account.codeByte) account.codeByte = Uint8Array.from(Object.values(account.codeByte)) // ContractCode - if (account.value) account.value = Uint8Array.from(Object.values(account.value)) // ContractByte -} - export const verifyAccountHash = (receipt: ArchiverReceipt): boolean => { try { if (receipt.globalModification && config.skipGlobalTxReceiptVerification) return true // return true if global modification @@ -87,16 +76,6 @@ export const verifyAccountHash = (receipt: ArchiverReceipt): boolean => { return false } for (const account of receipt.accounts) { - if (account.data.accountType === AccountType.Account) { - fixAccountUint8Arrays(account.data.account) - // console.dir(acc, { depth: null }) - } else if ( - account.data.accountType === AccountType.ContractCode || - account.data.accountType === AccountType.ContractStorage - ) { - fixAccountUint8Arrays(account.data) - // console.dir(acc, { depth: null }) - } const calculatedAccountHash = accountSpecificHash(account.data) const indexOfAccount = receipt.appliedReceipt.appliedVote.account_id.indexOf(account.accountId) if (indexOfAccount === -1) {