Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add syncArchiversList in the sync-v2 #5

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions src/Data/Cycles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,43 +328,15 @@ function updateNodeList(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
NodeList.removeNodes(lostAfterSelectionPks)

for (const joinedArchiver of joinedArchivers) {
const foundArchiver = State.activeArchivers.find((a) => a.publicKey === joinedArchiver.publicKey)
if (!foundArchiver) {
State.activeArchivers.push(joinedArchiver)
Utils.insertSorted(
State.activeArchiversByPublicKeySorted,
joinedArchiver,
NodeList.byAscendingPublicKey
)
Logger.mainLogger.debug(
'activeArchiversByPublicKeySorted',
State.activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey)
)
Logger.mainLogger.debug('New archiver added to active list', joinedArchiver)
}
Logger.mainLogger.debug('active archiver list', State.activeArchivers)
State.addArchiver(joinedArchiver)
}

for (const refreshedArchiver of refreshedArchivers) {
const foundArchiver = State.activeArchivers.find((a) => a.publicKey === refreshedArchiver.publicKey)
if (!foundArchiver) {
State.activeArchivers.push(refreshedArchiver)
Utils.insertSorted(
State.activeArchiversByPublicKeySorted,
refreshedArchiver,
NodeList.byAscendingPublicKey
)
Logger.mainLogger.debug(
'activeArchiversByPublicKeySorted',
State.activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey)
)
Logger.mainLogger.debug('Refreshed archiver added to active list', refreshedArchiver)
}
State.addArchiver(refreshedArchiver)
}

for (const leavingArchiver of leavingArchivers) {
State.removeActiveArchiver(leavingArchiver.publicKey)
State.archiversReputation.delete(leavingArchiver.publicKey)
}

const nodesToUnsubscribed = [...apoptosizedPks, ...removedPks]
Expand Down
27 changes: 17 additions & 10 deletions src/State.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,7 @@
continue
}
if (response && response.nodeList && response.nodeList.length > 0) {
activeArchivers.push(existingArchivers[i])
Utils.insertSorted(
activeArchiversByPublicKeySorted,
existingArchivers[i],
NodeList.byAscendingPublicKey
)
Logger.mainLogger.debug(
'activeArchiversByPublicKeySorted',
activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey)
)
addArchiver(existingArchivers[i])
}
}
/* eslint-enable security/detect-object-injection */
Expand Down Expand Up @@ -162,11 +153,27 @@
Logger.mainLogger.debug('Registerd exit signal listeners.')
}

export function addArchiver(archiver: ArchiverNodeInfo): void {
const foundArchiver = activeArchivers.find((a) => a.publicKey === archiver.publicKey)
if (!foundArchiver) {
Logger.mainLogger.debug('Adding archiver', archiver)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
activeArchivers.push(archiver)
Utils.insertSorted(activeArchiversByPublicKeySorted, archiver, NodeList.byAscendingPublicKey)
Logger.mainLogger.debug(
'activeArchiversByPublicKeySorted',
activeArchiversByPublicKeySorted.map((archiver) => archiver.publicKey)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
)
Logger.mainLogger.debug('New archiver added to active list', archiver)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
}
Logger.mainLogger.debug('archivers list', activeArchivers)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
}

export function removeActiveArchiver(publicKey: string): void {
activeArchivers = activeArchivers.filter((a: ArchiverNodeInfo) => a.publicKey !== publicKey)
activeArchiversByPublicKeySorted = activeArchiversByPublicKeySorted.filter(
(a: ArchiverNodeInfo) => a.publicKey !== publicKey
)
archiversReputation.delete(publicKey)
}

export function resetActiveArchivers(archivers: ArchiverNodeInfo[]): void {
Expand Down
145 changes: 94 additions & 51 deletions src/sync-v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import {
getValidatorListFromNode,
robustQueryForStandbyNodeListHash,
getStandbyNodeListFromNode,
robustQueryForArchiverListHash,
getArchiverListFromNode,
} from './queries'
import { ArchiverNodeInfo } from '../State'
import { ArchiverNodeInfo, resetActiveArchivers } from '../State'
import { getActiveNodeListFromArchiver } from '../NodeList'
import * as NodeList from '../NodeList'
import { verifyCycleRecord, verifyValidatorList } from './verify'
import { verifyArchiverList, verifyCycleRecord, verifyValidatorList } from './verify'
import * as Logger from '../Logger'

/**
Expand Down Expand Up @@ -53,62 +55,75 @@ export function syncV2(
return ResultAsync.fromPromise(getActiveListFromSomeArchiver(activeArchivers), (e: Error) => e).andThen(
(nodeList) =>
syncValidatorList(nodeList).andThen(([validatorList, validatorListHash]) =>
syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)
syncArchiverList(nodeList).andThen(([archiverList, archiverListHash]) =>
syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)

// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
)
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
}
if (cycle.archiverListHash !== archiverListHash) {
return errAsync(
new Error(
`archiver list hash from received cycle (${cycle.archiverListHash}) does not match the hash received from robust query (${archiverListHash})`
)
)
)
}
}

// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []

// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []
for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
}
}
}
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)

// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
// reset the active archivers list with the new list
resetActiveArchivers(archiverList)

// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
})
})
})
)
)
)
)
Expand Down Expand Up @@ -186,3 +201,31 @@ function syncLatestCycleRecordAndMarker(
)
)
}

/**
* This function queries for an archiver list from other active nodes.
*
* @param {P2P.SyncTypes.ActiveNode[]} activeNodes - An array of active nodes to be queried.
* The function first performs a robust query for the latest archiver list hash.
* Then, it requests a full list from one of the winning nodes using the hash
* retrieved. The node receiving the request may or may not have the list whose
* hash matches the one requested.
*
* @returns {ResultAsync<[P2P.ArchiversTypes.JoinedArchiver[], hexstring], Error>} - A ResultAsync object. On success, it will contain an array of
* JoinedArchiver objects and the archiver list hash, and on error, it will contain an Error object. The function is asynchronous and can be awaited.
*/
function syncArchiverList(
activeNodes: P2PTypes.SyncTypes.ActiveNode[]
): ResultAsync<[P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring], Error> {
// run a robust query for the lastest archiver list hash
return robustQueryForArchiverListHash(activeNodes).andThen(({ value, winningNodes }) =>
// get full archiver list from one of the winning nodes
getArchiverListFromNode(winningNodes[0], value.archiverListHash).andThen((archiverList) =>
// verify a hash of the retrieved archiver list matches the hash from before.
// if it does, return the archiver list
verifyArchiverList(archiverList, value.archiverListHash).map(
() => [archiverList, value.archiverListHash] as [P2PTypes.ArchiversTypes.JoinedArchiver[], hexstring]
)
)
)
}
8 changes: 8 additions & 0 deletions src/sync-v2/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ export function verifyStandbyList(
return verify(standbyList, expectedHash, 'standby list')
}

/** Verifies that the hash of the archiver list matches the expected hash. */
export function verifyArchiverList(
archiverList: P2P.ArchiversTypes.JoinedArchiver[],
expectedHash: hexstring
): Result<boolean, Error> {
return verify(archiverList, expectedHash, 'archiver list')
}

/** Verifies that the hash of the cycle record matches the expected hash. */
export function verifyCycleRecord(
cycleRecord: P2P.CycleCreatorTypes.CycleRecord,
Expand Down
Loading