Skip to content

Commit

Permalink
Add syncArchiversList logic in the sync-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
tanuj-shardeum authored and jairajdev committed Apr 23, 2024
1 parent 1d1bd2a commit c752ae6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 52 deletions.
157 changes: 105 additions & 52 deletions src/sync-v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ import {
getValidatorListFromNode,
robustQueryForStandbyNodeListHash,
getStandbyNodeListFromNode,
robustQueryForArchiverListHash,
getArchiverListFromNode,
} from './queries'
import { ArchiverNodeInfo } from '../State'
import { ArchiverNodeInfo, activeArchiversByPublicKeySorted } from '../State'
import { getActiveNodeListFromArchiver } from '../NodeList'
import * as NodeList from '../NodeList'
import { verifyCycleRecord, verifyValidatorList } from './verify'
import { verifyArchiverList, verifyCycleRecord, verifyValidatorList } from './verify'
import * as Logger from '../Logger'
import * as Utils from '../Utils'

/**
* Given a list of archivers, queries each one until one returns an active node list.
Expand Down Expand Up @@ -53,64 +56,86 @@ export function syncV2(
return ResultAsync.fromPromise(getActiveListFromSomeArchiver(activeArchivers), (e: Error) => e).andThen(
(nodeList) =>
syncValidatorList(nodeList).andThen(([validatorList, validatorListHash]) =>
syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)
syncArchiverList(nodeList).andThen(([archiverList, archiverListHash]) =>
syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)

// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
)
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
}
if (cycle.archiverListHash !== archiverListHash) {
return errAsync(
new Error(
`archiver list hash from received cycle (${cycle.archiverListHash}) does not match the hash received from robust query (${archiverListHash})`
)
)
)
}
}

// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []

// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []
for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
}
}
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)

// add archivers
for (const archiver of archiverList) {
if (!activeArchivers.find(obj => obj.publicKey === archiver.publicKey)) {
activeArchivers.push(archiver)
Utils.insertSorted(
activeArchiversByPublicKeySorted,
archiver,
NodeList.byAscendingPublicKey
)
}
}
}
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)

// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
})
})
})
)
)
)
)
)
}

Expand Down Expand Up @@ -186,3 +211,31 @@ function syncLatestCycleRecordAndMarker(
)
)
}

/**
* This function queries for an archiver list from other active nodes.
*
* @param {P2P.SyncTypes.ActiveNode[]} activeNodes - An array of active nodes to be queried.
* The function first performs a robust query for the latest archiver list hash.
* Then, it requests a full list from one of the winning nodes using the hash
* retrieved. The node receiving the request may or may not have the list whose
* hash matches the one requested.
*
* @returns {ResultAsync<[P2P.ArchiversTypes.JoinedArchiver[], hexstring], Error>} - A ResultAsync object. On success, it will contain an array of
* JoinedArchiver objects and the archiver list hash, and on error, it will contain an Error object. The function is asynchronous and can be awaited.
*/
function syncArchiverList(
activeNodes: P2PTypes.SyncTypes.ActiveNode[]
): ResultAsync<[P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring], Error> {
// run a robust query for the lastest archiver list hash
return robustQueryForArchiverListHash(activeNodes).andThen(({ value, winningNodes }) =>
// get full archiver list from one of the winning nodes
getArchiverListFromNode(winningNodes[0], value.archiverListHash).andThen((archiverList) =>
// verify a hash of the retrieved archiver list matches the hash from before.
// if it does, return the archiver list
verifyArchiverList(archiverList, value.archiverListHash).map(
() => [archiverList, value.archiverListHash] as [P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring]
)
)
)
}
8 changes: 8 additions & 0 deletions src/sync-v2/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ export function verifyStandbyList(
return verify(standbyList, expectedHash, 'standby list')
}

/** Verifies that the hash of the archiver list matches the expected hash. */
export function verifyArchiverList(
archiverList: P2P.ArchiversTypes.JoinedArchiver[],
expectedHash: hexstring
): Result<boolean, Error> {
return verify(archiverList, expectedHash, 'archiver list')
}

/** Verifies that the hash of the cycle record matches the expected hash. */
export function verifyCycleRecord(
cycleRecord: P2P.CycleCreatorTypes.CycleRecord,
Expand Down

0 comments on commit c752ae6

Please sign in to comment.