Skip to content

Commit

Permalink
implement ServiceQueue module
Browse files Browse the repository at this point in the history
write updateNetworkTxsList func

create /network-txs-list endpoint

fix: improper logic in case txRemove tx is not in txList

ServiceQueue: updated sortedInsert() definition and calls

ServiceQueue: fix Logger import

add verify function for syncing txList

add queries for syncing txList from validators

add syncTxList func + use it in syncV2 step

add ServiceQueue funcs + minor refactor

handle case where txList isnt correct post-cycle record parse
  • Loading branch information
ahmxdiqbal authored and afostr committed Sep 11, 2024
1 parent 9a14537 commit 41b5c1f
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 57 deletions.
10 changes: 10 additions & 0 deletions src/API.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
successReceiptCount,
failureReceiptCount,
} from './primary-process'
import * as ServiceQueue from './ServiceQueue'
const { version } = require('../package.json') // eslint-disable-line @typescript-eslint/no-var-requires

const TXID_LENGTH = 64
Expand Down Expand Up @@ -177,6 +178,15 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
profilerInstance.profileSectionEnd('GET_nodelist')
})

server.get('/network-txs-list', (_request, reply) => {
profilerInstance.profileSectionStart('network-txs-list')
nestedCountersInstance.countEvent('consensor', 'network-txs-list')

const res = ServiceQueue.getTxList()
reply.send(res)
profilerInstance.profileSectionEnd('network-txs-list')
})

type FullNodeListRequest = FastifyRequest<{
Querystring: {
activeOnly: 'true' | 'false'
Expand Down
30 changes: 30 additions & 0 deletions src/Data/Cycles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getJson, postJson } from '../P2P'
import { profilerInstance } from '../profiler/profiler'
import { nestedCountersInstance } from '../profiler/nestedCounters'
import * as cycleDataCache from '../cache/cycleRecordsCache'
import * as ServiceQueue from '../ServiceQueue'

import {
clearDataSenders,
Expand All @@ -30,6 +31,7 @@ import { stringifyReduce } from '../profiler/StringifyReduce'
import { addCyclesToCache } from '../cache/cycleRecordsCache'
import { queryLatestCycleRecords } from '../dbstore/cycles'
import { updateGlobalNetworkAccount } from '../GlobalAccount'
import { syncTxList } from '../sync-v2'

export interface ArchiverCycleResponse {
cycleInfo: P2PTypes.CycleCreatorTypes.CycleData[]
Expand Down Expand Up @@ -69,6 +71,7 @@ export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData

// Update NodeList from cycle info
updateNodeList(cycle)
updateNetworkTxsList(cycle)
updateShardValues(cycle)
changeNetworkMode(cycle.mode)
getAdjacentLeftAndRightArchivers()
Expand Down Expand Up @@ -353,6 +356,33 @@ function updateNodeList(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
}
}

async function updateNetworkTxsList(cycle: P2PTypes.CycleCreatorTypes.CycleData): Promise<void> {
const {
txadd,
txremove
} = cycle

ServiceQueue.addTxs(txadd)
ServiceQueue.removeTxs(txremove)

const calculatedTxListHash = ServiceQueue.getNetworkTxsListHash()

if (calculatedTxListHash !== cycle.txlisthash) {
console.error('txList hash from cycle record does not match the calculated txList hash')
const syncTxListResult = await syncTxList(NodeList.getActiveList())

syncTxListResult.match(
(txList) => {
Logger.mainLogger.debug("Successfully synced txList from validators"),
ServiceQueue.setTxList(txList)
},
(error) => {
Logger.mainLogger.error("Failed to synchronize transaction list:", error.message);
}
);
}
}

export async function fetchCycleRecords(
start: number,
end: number
Expand Down
73 changes: 73 additions & 0 deletions src/ServiceQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { P2P } from "@shardus/types";
import * as Logger from './Logger'
import { stringifyReduce } from "./profiler/StringifyReduce";
import * as crypto from './Crypto'

let txList: P2P.ServiceQueueTypes.NetworkTxEntry[] = []

export function addTxs(addTxs: P2P.ServiceQueueTypes.AddNetworkTx[]): boolean {
try {
for (const addTx of addTxs) {
Logger.mainLogger.info(`Adding network tx of type ${addTx.type} and payload ${stringifyReduce(addTx.txData)}`)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
const { sign, ...txDataWithoutSign } = addTx.txData
sortedInsert(txList, {
hash: addTx.hash,
tx: {
hash: addTx.hash,
txData: txDataWithoutSign,
type: addTx.type,
cycle: addTx.cycle,
...(addTx.subQueueKey && { subQueueKey: addTx.subQueueKey }),
},
})
}
return true
} catch (e) {
Logger.mainLogger.error(`ServiceQueue:addTxs: Error adding txs: ${e}`)
return false
}
}

export function removeTxs(removeTxs: P2P.ServiceQueueTypes.RemoveNetworkTx[]): boolean {
try {
for (const removeTx of removeTxs) {
const index = txList.findIndex((entry) => entry.hash === removeTx.txHash)
if (index === -1) {
Logger.mainLogger.error(`TxHash ${removeTx.txHash} does not exist in txList`)

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
} else {
txList.splice(index, 1)
}
}
return true
} catch (e) {
Logger.mainLogger.error(`ServiceQueue:removeTxs: Error removing txs: ${e}`)
return false
}
}

export function setTxList(_txList: P2P.ServiceQueueTypes.NetworkTxEntry[]): void {
txList = _txList
}

export function getTxList(): P2P.ServiceQueueTypes.NetworkTxEntry[] {
return txList
}

export function getNetworkTxsListHash(): string {
return crypto.hashObj(txList)
}

function sortedInsert(
list: P2P.ServiceQueueTypes.NetworkTxEntry[],
entry: P2P.ServiceQueueTypes.NetworkTxEntry
): void {
const index = list.findIndex(
(item) =>
item.tx.cycle > entry.tx.cycle || (item.tx.cycle === entry.tx.cycle && item.hash > entry.tx.hash)
)
if (index === -1) {
list.push(entry)
} else {
list.splice(index, 0, entry)
}
}
130 changes: 73 additions & 57 deletions src/sync-v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import {
getStandbyNodeListFromNode,
robustQueryForArchiverListHash,
getArchiverListFromNode,
robustQueryForTxListHash,
getTxListFromNode,
} from './queries'
import { ArchiverNodeInfo, resetActiveArchivers } from '../State'
import { getActiveNodeListFromArchiver } from '../NodeList'
import * as NodeList from '../NodeList'
import { verifyArchiverList, verifyCycleRecord, verifyValidatorList } from './verify'
import { verifyArchiverList, verifyCycleRecord, verifyValidatorList, verifyTxList } from './verify'
import * as Logger from '../Logger'
import * as ServiceQueue from '../ServiceQueue'

/**
* Given a list of archivers, queries each one until one returns an active node list.
Expand Down Expand Up @@ -57,72 +60,77 @@ export function syncV2(
syncValidatorList(nodeList).andThen(([validatorList, validatorListHash]) =>
syncArchiverList(nodeList).andThen(([archiverList, archiverListHash]) =>
syncStandbyNodeList(nodeList).andThen(([standbyList, standbyListHash]) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)
syncTxList(nodeList).andThen((txList) =>
syncLatestCycleRecordAndMarker(nodeList).andThen(([cycle, cycleMarker]) => {
Logger.mainLogger.debug('syncV2: validatorList', validatorList)

// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
// additional checks to make sure the list hashes in the cycle
// matches the hash for the validator list retrieved earlier
if (cycle.nodeListHash !== validatorListHash) {
return errAsync(
new Error(
`validator list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
)
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
}
if (cycle.standbyNodeListHash !== standbyListHash) {
return errAsync(
new Error(
`standby list hash from received cycle (${cycle.nodeListHash}) does not match the hash received from robust query (${validatorListHash})`
)
)
)
}
if (cycle.archiverListHash !== archiverListHash) {
return errAsync(
new Error(
`archiver list hash from received cycle (${cycle.archiverListHash}) does not match the hash received from robust query (${archiverListHash})`
}
if (cycle.archiverListHash !== archiverListHash) {
return errAsync(
new Error(
`archiver list hash from received cycle (${cycle.archiverListHash}) does not match the hash received from robust query (${archiverListHash})`
)
)
)
}
}

// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []
// validatorList and standbyList need to be transformed into a ConsensusNodeInfo[]
const syncingNodeList: NodeList.ConsensusNodeInfo[] = []
const activeNodeList: NodeList.ConsensusNodeInfo[] = []

for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
for (const node of validatorList) {
if (node.status === 'selected' || node.status === 'syncing' || node.status === 'ready') {
syncingNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
} else if (node.status === 'active') {
activeNodeList.push({
publicKey: node.publicKey,
ip: node.externalIp,
port: node.externalPort,
id: node.id,
})
}
}
}
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)
const standbyNodeList: NodeList.ConsensusNodeInfo[] = standbyList.map((joinRequest) => ({
publicKey: joinRequest.nodeInfo.publicKey,
ip: joinRequest.nodeInfo.externalIp,
port: joinRequest.nodeInfo.externalPort,
}))
NodeList.addNodes(NodeList.NodeStatus.SYNCING, syncingNodeList)
NodeList.addNodes(NodeList.NodeStatus.ACTIVE, activeNodeList)
NodeList.addStandbyNodes(standbyNodeList)

// add txList
ServiceQueue.setTxList(txList)

// reset the active archivers list with the new list
resetActiveArchivers(archiverList)
// reset the active archivers list with the new list
resetActiveArchivers(archiverList)

// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
// return a cycle that we'll store in the database
return okAsync({
...cycle,
marker: cycleMarker,
})
})
})
)
)
)
)
Expand Down Expand Up @@ -177,6 +185,14 @@ function syncStandbyNodeList(
)
}

export function syncTxList(activeNodes: P2PTypes.SyncTypes.ActiveNode[]): ResultAsync<P2PTypes.ServiceQueueTypes.NetworkTxEntry[], Error> {
return robustQueryForTxListHash(activeNodes).andThen(({ value, winningNodes }) =>
getTxListFromNode(winningNodes[0], value.txListHash).andThen((txList) =>
verifyTxList(txList, value.txListHash).map(() => txList)
)
)
}

/**
* Synchronizes the latest cycle record from a list of active nodes.
*
Expand Down
17 changes: 17 additions & 0 deletions src/sync-v2/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ export function robustQueryForStandbyNodeListHash(
return makeRobustQueryCall(nodes, 'standby-list-hash')
}

/** Executes a robust query to retrieve the txList hash from the network. */
export function robustQueryForTxListHash(
nodes: ActiveNode[]
): RobustQueryResultAsync<{ txListHash: hexstring }> {
return makeRobustQueryCall(nodes, 'tx-list-hash')
}

/** Retrives the entire last cycle from the node. */
export function getCurrentCycleDataFromNode(
node: ActiveNode,
Expand Down Expand Up @@ -185,3 +192,13 @@ export function getStandbyNodeListFromNode(
hash: expectedHash,
})
}

/** Gets the full tx list from the specified node */
export function getTxListFromNode(
node: ActiveNode,
expectedHash: hexstring
): ResultAsync<P2P.ServiceQueueTypes.NetworkTxEntry[], Error> {
return attemptSimpleFetch(node, 'tx-list', {
hash: expectedHash,
})
}
14 changes: 14 additions & 0 deletions src/sync-v2/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ export function verifyArchiverList(
return verify(archiverList, expectedHash, 'archiver list')
}

/** Verifies that the hash of the tx list matches the expected hash. */
export function verifyTxList(
txList: P2P.ServiceQueueTypes.NetworkTxEntry[],
expectedHash: string
): Result<boolean, Error> {
const actualHash = hashObj(txList)

// verify that the hash of the CycleRecord matches the expected hash
if (actualHash !== expectedHash)
return err(new Error(`hash mismatch for txList: expected ${expectedHash}, got ${actualHash}`))

return ok(true)
}

/** Verifies that the hash of the cycle record matches the expected hash. */
export function verifyCycleRecord(
cycleRecord: P2P.CycleCreatorTypes.CycleRecord,
Expand Down

0 comments on commit 41b5c1f

Please sign in to comment.