diff --git a/src/Data/Cycles.ts b/src/Data/Cycles.ts index 46de6800..f4aa04ce 100644 --- a/src/Data/Cycles.ts +++ b/src/Data/Cycles.ts @@ -328,43 +328,15 @@ function updateNodeList(cycle: P2PTypes.CycleCreatorTypes.CycleData): void { NodeList.removeNodes(lostAfterSelectionPks) for (const joinedArchiver of joinedArchivers) { - const foundArchiver = State.activeArchivers.find((a) => a.publicKey === joinedArchiver.publicKey) - if (!foundArchiver) { - State.activeArchivers.push(joinedArchiver) - Utils.insertSorted( - State.activeArchiversByPublicKeySorted, - joinedArchiver, - NodeList.byAscendingPublicKey - ) - Logger.mainLogger.debug( - 'activeArchiversByPublicKeySorted', - State.activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey) - ) - Logger.mainLogger.debug('New archiver added to active list', joinedArchiver) - } - Logger.mainLogger.debug('active archiver list', State.activeArchivers) + State.addArchiver(joinedArchiver) } for (const refreshedArchiver of refreshedArchivers) { - const foundArchiver = State.activeArchivers.find((a) => a.publicKey === refreshedArchiver.publicKey) - if (!foundArchiver) { - State.activeArchivers.push(refreshedArchiver) - Utils.insertSorted( - State.activeArchiversByPublicKeySorted, - refreshedArchiver, - NodeList.byAscendingPublicKey - ) - Logger.mainLogger.debug( - 'activeArchiversByPublicKeySorted', - State.activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey) - ) - Logger.mainLogger.debug('Refreshed archiver added to active list', refreshedArchiver) - } + State.addArchiver(refreshedArchiver) } for (const leavingArchiver of leavingArchivers) { State.removeActiveArchiver(leavingArchiver.publicKey) - State.archiversReputation.delete(leavingArchiver.publicKey) } const nodesToUnsubscribed = [...apoptosizedPks, ...removedPks] diff --git a/src/State.ts b/src/State.ts index c1b925f0..a98cc078 100644 --- a/src/State.ts +++ b/src/State.ts @@ -100,16 +100,7 @@ export async function initFromConfig(config: Config, shutDownMode = false): Prom continue } if (response && response.nodeList && response.nodeList.length > 0) { - activeArchivers.push(existingArchivers[i]) - Utils.insertSorted( - activeArchiversByPublicKeySorted, - existingArchivers[i], - NodeList.byAscendingPublicKey - ) - Logger.mainLogger.debug( - 'activeArchiversByPublicKeySorted', - activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey) - ) + addArchiver(existingArchivers[i]) } } /* eslint-enable security/detect-object-injection */ @@ -162,11 +153,27 @@ export function addSigListeners(sigint = true, sigterm = true): void { Logger.mainLogger.debug('Registerd exit signal listeners.') } +export function addArchiver(archiver: ArchiverNodeInfo): void { + const foundArchiver = activeArchivers.find((a) => a.publicKey === archiver.publicKey) + if (!foundArchiver) { + Logger.mainLogger.debug('Adding archiver', archiver) + activeArchivers.push(archiver) + Utils.insertSorted(activeArchiversByPublicKeySorted, archiver, NodeList.byAscendingPublicKey) + Logger.mainLogger.debug( + 'activeArchiversByPublicKeySorted', + activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey) + ) + Logger.mainLogger.debug('New archiver added to active list', archiver) + } + Logger.mainLogger.debug('archivers list', activeArchivers) +} + export function removeActiveArchiver(publicKey: string): void { activeArchivers = activeArchivers.filter((a: ArchiverNodeInfo) => a.publicKey !== publicKey) activeArchiversByPublicKeySorted = activeArchiversByPublicKeySorted.filter( (a: ArchiverNodeInfo) => a.publicKey !== publicKey ) + archiversReputation.delete(publicKey) } export function resetActiveArchivers(archivers: ArchiverNodeInfo[]): void { diff --git a/src/sync-v2/index.ts b/src/sync-v2/index.ts index 648e9f95..d13359c4 100644 --- a/src/sync-v2/index.ts +++ b/src/sync-v2/index.ts @@ -12,11 +12,13 @@ import { getValidatorListFromNode, robustQueryForStandbyNodeListHash, getStandbyNodeListFromNode, + robustQueryForArchiverListHash, + getArchiverListFromNode, } from './queries' -import { ArchiverNodeInfo } from '../State' +import { ArchiverNodeInfo, resetActiveArchivers } from '../State' import { getActiveNodeListFromArchiver } from '../NodeList' import * as NodeList from '../NodeList' -import { verifyCycleRecord, verifyValidatorList } from './verify' +import { verifyArchiverList, verifyCycleRecord, verifyValidatorList } from './verify' import * as Logger from '../Logger' /** @@ -53,62 +55,75 @@ export function syncV2( return ResultAsync.fromPromise(getActiveListFromSomeArchiver(activeArchivers), (e: Error) => e).andThen( (nodeList) => syncValidatorList(nodeList).andThen(([validatorList, validatorListHash]) => - syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) => - syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => { - Logger.mainLogger.debug('syncV2: validatorList', validatorList) + syncArchiverList(nodeList).andThen(([archiverList, archiverListHash]) => + syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) => + syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => { + Logger.mainLogger.debug('syncV2: validatorList', validatorList) - // additional checks to make sure the list hashes in the cycle - // matches the hash for the validator list retrieved earlier - if (cycle.nodeListHash !== validatorListHash) { - return errAsync( - new Error( - `validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})` + // additional checks to make sure the list hashes in the cycle + // matches the hash for the validator list retrieved earlier + if (cycle.nodeListHash !== validatorListHash) { + return errAsync( + new Error( + `validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})` + ) ) - ) - } - if (cycle.standbyNodeListHash !== standbyListHash) { - return errAsync( - new Error( - `standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})` + } + if (cycle.standbyNodeListHash !== standbyListHash) { + return errAsync( + new Error( + `standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})` + ) + ) + } + if (cycle.archiverListHash !== archiverListHash) { + return errAsync( + new Error( + `archiver list hash from received cycle (${cycle.archiverListHash}) does not match the hash received from robust query (${archiverListHash})` + ) ) - ) - } + } + + // validatorList and standbyList need to be transformed into a ConsensusNodeInfo[] + const syncingNodeList: NodeList.ConsensusNodeInfo[] = [] + const activeNodeList: NodeList.ConsensusNodeInfo[] = [] - // validatorList and standbyList need to be transformed into a ConsensusNodeInfo[] - const syncingNodeList: NodeList.ConsensusNodeInfo[] = [] - const activeNodeList: NodeList.ConsensusNodeInfo[] = [] - for (const node of validatorList) { - if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') { - syncingNodeList.push({ - publicKey: node.publicKey, - ip: node.externalIp, - port: node.externalPort, - id: node.id, - }) - } else if (node.status === 'active') { - activeNodeList.push({ - publicKey: node.publicKey, - ip: node.externalIp, - port: node.externalPort, - id: node.id, - }) + for (const node of validatorList) { + if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') { + syncingNodeList.push({ + publicKey: node.publicKey, + ip: node.externalIp, + port: node.externalPort, + id: node.id, + }) + } else if (node.status === 'active') { + activeNodeList.push({ + publicKey: node.publicKey, + ip: node.externalIp, + port: node.externalPort, + id: node.id, + }) + } } - } - const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({ - publicKey: joinRequest.nodeInfo.publicKey, - ip: joinRequest.nodeInfo.externalIp, - port: joinRequest.nodeInfo.externalPort, - })) - NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList) - NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList) - NodeList.addStandbyNodes(standbyNodeList) + const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({ + publicKey: joinRequest.nodeInfo.publicKey, + ip: joinRequest.nodeInfo.externalIp, + port: joinRequest.nodeInfo.externalPort, + })) + NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList) + NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList) + NodeList.addStandbyNodes(standbyNodeList) - // return a cycle that we'll store in the database - return okAsync({ - ...cycle, - marker: cycleMarker, + // reset the active archivers list with the new list + resetActiveArchivers(archiverList) + + // return a cycle that we'll store in the database + return okAsync({ + ...cycle, + marker: cycleMarker, + }) }) - }) + ) ) ) ) @@ -186,3 +201,31 @@ function syncLatestCycleRecordAndMarker( ) ) } + +/** + * This function queries for an archiver list from other active nodes. + * + * @param {P2P.SyncTypes.ActiveNode[]} activeNodes - An array of active nodes to be queried. + * The function first performs a robust query for the latest archiver list hash. + * Then, it requests a full list from one of the winning nodes using the hash + * retrieved. The node receiving the request may or may not have the list whose + * hash matches the one requested. + * + * @returns {ResultAsync<[P2P.ArchiversTypes.JoinedArchiver[], hexstring], Error>} - A ResultAsync object. On success, it will contain an array of + * JoinedArchiver objects and the archiver list hash, and on error, it will contain an Error object. The function is asynchronous and can be awaited. + */ +function syncArchiverList( + activeNodes: P2PTypes.SyncTypes.ActiveNode[] +): ResultAsync<[P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring], Error> { + // run a robust query for the lastest archiver list hash + return robustQueryForArchiverListHash(activeNodes).andThen(({ value, winningNodes }) => + // get full archiver list from one of the winning nodes + getArchiverListFromNode(winningNodes[0], value.archiverListHash).andThen((archiverList) => + // verify a hash of the retrieved archiver list matches the hash from before. + // if it does, return the archiver list + verifyArchiverList(archiverList, value.archiverListHash).map( + () => [archiverList, value.archiverListHash] as [P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring] + ) + ) + ) +} diff --git a/src/sync-v2/verify.ts b/src/sync-v2/verify.ts index 7126a540..4c51b311 100644 --- a/src/sync-v2/verify.ts +++ b/src/sync-v2/verify.ts @@ -50,6 +50,14 @@ export function verifyStandbyList( return verify(standbyList, expectedHash, 'standby list') } +/** Verifies that the hash of the archiver list matches the expected hash. */ +export function verifyArchiverList( + archiverList: P2P.ArchiversTypes.JoinedArchiver[], + expectedHash: hexstring +): Result { + return verify(archiverList, expectedHash, 'archiver list') +} + /** Verifies that the hash of the cycle record matches the expected hash. */ export function verifyCycleRecord( cycleRecord: P2P.CycleCreatorTypes.CycleRecord,