Skip to content

Commit

Permalink
Perf improvements for shard calculations. 5x faster. Test code for of…
Browse files Browse the repository at this point in the history
…fline profiling and validation
  • Loading branch information
afostr committed Oct 29, 2021
1 parent b64c5da commit c1494e0
Show file tree
Hide file tree
Showing 7 changed files with 619 additions and 38 deletions.
13 changes: 13 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@
"<node_internals>/**"
]
},
{
"type": "node",
"request": "launch",
"name": "shardPerfTest",
"program": "${workspaceFolder}\\test\\unit\\state-manager\\shardPerfTest.js",
//"port": 9229,
//"restart": true,
//"sourceMaps": true,
//"protocol": "inspector" ,
"skipFiles": [
"<node_internals>/**"
]
},
{
"type": "node",
"request": "launch",
Expand Down
4 changes: 3 additions & 1 deletion src/snapshot/snapshotFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ export async function calculateOldDataMap(
nodes,
partitionShardDataMap,
nodes,
true
true,
false // this is not the active node list. Perf will be slower so we may want to
// rework this calculation
)

// If we have old data, figure out which partitions we have and put into OldDataMap
Expand Down
22 changes: 12 additions & 10 deletions src/state-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,30 +470,32 @@ class StateManager {
// save this per cycle?
cycleShardData.shardGlobals = ShardFunctions.calculateShardGlobals(cycleShardData.activeNodes.length, this.config.sharding.nodesPerConsensusGroup as number, edgeNodes)

this.profiler.profileSectionStart('updateShardValues_computePartitionShardDataMap1')
this.profiler.profileSectionStart('updateShardValues_computePartitionShardDataMap1') //13ms, #:60
// partition shard data
ShardFunctions.computePartitionShardDataMap(cycleShardData.shardGlobals, cycleShardData.parititionShardDataMap, 0, cycleShardData.shardGlobals.numPartitions)
this.profiler.profileSectionEnd('updateShardValues_computePartitionShardDataMap1')

this.profiler.profileSectionStart('updateShardValues_computePartitionShardDataMap2')
this.profiler.profileSectionStart('updateShardValues_computePartitionShardDataMap2') //37ms, #:60
// generate limited data for all nodes data for all nodes.
ShardFunctions.computeNodePartitionDataMap(cycleShardData.shardGlobals, cycleShardData.nodeShardDataMap, cycleShardData.activeNodes, cycleShardData.parititionShardDataMap, cycleShardData.activeNodes, false)
this.profiler.profileSectionEnd('updateShardValues_computePartitionShardDataMap2')

this.profiler.profileSectionStart('updateShardValues_computeNodePartitionData')
this.profiler.profileSectionStart('updateShardValues_computeNodePartitionData') //22ms, #:60
// get extended data for our node
cycleShardData.nodeShardData = ShardFunctions.computeNodePartitionData(cycleShardData.shardGlobals, cycleShardData.ourNode, cycleShardData.nodeShardDataMap, cycleShardData.parititionShardDataMap, cycleShardData.activeNodes, true)
this.profiler.profileSectionEnd('updateShardValues_computeNodePartitionData')

this.profiler.profileSectionStart('updateShardValues_computeNodePartitionDataMap1')
// This is currently redudnant if we move to lazy init of extended data we should turn it back on
// this.profiler.profileSectionStart('updateShardValues_computeNodePartitionDataMap1') // 4ms, #:60
// TODO perf scalability need to generate this as needed in very large networks with millions of nodes.
// generate full data for nodes that store our home partition
ShardFunctions.computeNodePartitionDataMap(cycleShardData.shardGlobals, cycleShardData.nodeShardDataMap, cycleShardData.nodeShardData.nodeThatStoreOurParitionFull, cycleShardData.parititionShardDataMap, cycleShardData.activeNodes, true)
this.profiler.profileSectionEnd('updateShardValues_computeNodePartitionDataMap1')
//
// ShardFunctions.computeNodePartitionDataMap(cycleShardData.shardGlobals, cycleShardData.nodeShardDataMap, cycleShardData.nodeShardData.nodeThatStoreOurParitionFull, cycleShardData.parititionShardDataMap, cycleShardData.activeNodes, true, false)
// this.profiler.profileSectionEnd('updateShardValues_computeNodePartitionDataMap1')

// cycleShardData.nodeShardData = cycleShardData.nodeShardDataMap.get(cycleShardData.ourNode.id)

this.profiler.profileSectionStart('updateShardValues_computeNodePartitionDataMap2')
this.profiler.profileSectionStart('updateShardValues_computeNodePartitionDataMap2') //232ms, #:60
// generate lightweight data for all active nodes (note that last parameter is false to specify the lightweight data)
let fullDataForDebug = true // Set this to false for performance reasons!!! setting it to true saves us from having to recalculate stuff when we dump logs.
ShardFunctions.computeNodePartitionDataMap(cycleShardData.shardGlobals, cycleShardData.nodeShardDataMap, cycleShardData.activeNodes, cycleShardData.parititionShardDataMap, cycleShardData.activeNodes, fullDataForDebug)
Expand All @@ -507,7 +509,7 @@ class StateManager {

// calculate nodes that would just now start syncing edge data because the network shrank.
if (cycleShardData.ourNode.status === 'active') {
this.profiler.profileSectionStart('updateShardValues_getOrderedSyncingNeighbors')
this.profiler.profileSectionStart('updateShardValues_getOrderedSyncingNeighbors') //0
// calculate if there are any nearby nodes that are syncing right now.
if (logFlags.verbose) this.mainLogger.debug(`updateShardValues: getOrderedSyncingNeighbors`)
cycleShardData.syncingNeighbors = this.p2p.state.getOrderedSyncingNeighbors(cycleShardData.ourNode)
Expand Down Expand Up @@ -538,13 +540,13 @@ class StateManager {
// }
// this.preTXQueue = []
// }
this.profiler.profileSectionStart('updateShardValues_updateRuntimeSyncTrackers')
this.profiler.profileSectionStart('updateShardValues_updateRuntimeSyncTrackers') //0
this.accountSync.updateRuntimeSyncTrackers()
this.profiler.profileSectionEnd('updateShardValues_updateRuntimeSyncTrackers')
// this.calculateChangeInCoverage()
}

this.profiler.profileSectionStart('updateShardValues_getPartitionLists')
this.profiler.profileSectionStart('updateShardValues_getPartitionLists') // 0
// calculate our consensus partitions for use by data repair:
// cycleShardData.ourConsensusPartitions = []
let partitions = ShardFunctions.getConsenusPartitionList(cycleShardData.shardGlobals, cycleShardData.nodeShardData)
Expand Down
164 changes: 139 additions & 25 deletions src/state-manager/shardFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,23 +301,30 @@ class ShardFunctions {
nodesToGenerate: Shardus.Node[],
parititionShardDataMap: StateManager.shardFunctionTypes.ParititionShardDataMap,
activeNodes: Shardus.Node[],
extendedData: boolean
extendedData: boolean,
isActiveNodeList: boolean = true
) {
let index = 0
for (let node of nodesToGenerate) {
let nodeShardData = nodeShardDataMap.get(node.id)
if (!nodeShardData) {
nodeShardData = ShardFunctions.computeNodePartitionData(shardGlobals, node, nodeShardDataMap, parititionShardDataMap, activeNodes, false)
let thisNodeIndex = undefined
if(isActiveNodeList){
thisNodeIndex = index
}
nodeShardData = ShardFunctions.computeNodePartitionData(shardGlobals, node, nodeShardDataMap, parititionShardDataMap, activeNodes, false, thisNodeIndex)
}
index++
}
// second pass for extended data
for (let node of nodesToGenerate) {
let nodeShardData = nodeShardDataMap.get(node.id)
if (nodeShardData == null) {
//log error?
continue
}

if (extendedData) {
if (extendedData) {
index = 0
// second pass for extended data
for (let node of nodesToGenerate) {
let nodeShardData = nodeShardDataMap.get(node.id)
if (nodeShardData == null) {
//log error?
continue
}
ShardFunctions.computeExtendedNodePartitionData(shardGlobals, nodeShardDataMap, parititionShardDataMap, nodeShardData, activeNodes)
}
}
Expand All @@ -329,25 +336,31 @@ class ShardFunctions {
nodeShardDataMap: StateManager.shardFunctionTypes.NodeShardDataMap,
parititionShardDataMap: StateManager.shardFunctionTypes.ParititionShardDataMap,
activeNodes: Shardus.Node[],
extendedData?: boolean
extendedData?: boolean,
thisNodeIndex?: number
): StateManager.shardFunctionTypes.NodeShardData {
let numPartitions = shardGlobals.numPartitions

let nodeShardData = {} as StateManager.shardFunctionTypes.NodeShardData

nodeShardData.ourNodeIndex = activeNodes.findIndex(function (_node) {
return _node.id === node.id
})

if (nodeShardData.ourNodeIndex === -1) {
for (let i = 0; i < activeNodes.length; i++) {
nodeShardData.ourNodeIndex = i
if (activeNodes[i].id >= node.id) {
break
if(thisNodeIndex != undefined){
nodeShardData.ourNodeIndex = thisNodeIndex
} else{
//this is way too slow
nodeShardData.ourNodeIndex = activeNodes.findIndex(function (_node) {
return _node.id === node.id
})
//find closest index if our node is not in the active list. Not sure this is valid
//I think we need a more direct path for computing info on nodes that are not active yet
if (nodeShardData.ourNodeIndex === -1) {
for (let i = 0; i < activeNodes.length; i++) {
nodeShardData.ourNodeIndex = i
if (activeNodes[i].id >= node.id) {
break
}
}
}
}

let homePartition = nodeShardData.ourNodeIndex
let centeredAddress = Math.floor(((homePartition + 0.5) * 0xffffffff) / numPartitions)
let nodeAddressNum = centeredAddress
Expand Down Expand Up @@ -422,8 +435,20 @@ class ShardFunctions {
let exclude = [nodeShardData.node.id]
let excludeNodeArray = [nodeShardData.node]

// tried a better way but it dies of needing data we dont have yet..
nodeShardData.nodeThatStoreOurParition = ShardFunctions.getNodesThatCoverParitionRaw(shardGlobals, nodeShardDataMap, nodeShardData.homePartition, exclude, activeNodes)
nodeShardData.nodeThatStoreOurParition = ShardFunctions.getNodesThatCoverHomePartition(shardGlobals, nodeShardData, nodeShardDataMap, activeNodes)

// let temp = ShardFunctions.getNodesThatCoverPartitionRaw(shardGlobals, nodeShardDataMap, nodeShardData.homePartition, exclude, activeNodes)
// //temp validation that the functions above are equal
// let diffA = ShardFunctions.subtractNodeLists(temp, nodeShardData.nodeThatStoreOurParition)
// let diffB = ShardFunctions.subtractNodeLists(nodeShardData.nodeThatStoreOurParition, temp)
// if(diffA.length > 0){
// throw new Error( `diffA ${diffA.length}`)
// }
// if(diffB.length > 0){
// throw new Error( `diffB ${diffB.length}`)
// }


// nodeShardData.nodeThatStoreOurParition = ShardFunctions.getNodesThatCoverRange(shardGlobals, nodeShardData.storedPartitions.homeRange.low, nodeShardData.storedPartitions.homeRange.high, exclude, activeNodes)

// check if node is active because there are many calculations that are invalid or wrong if you try to compute them with a node that is not active in the network.
Expand Down Expand Up @@ -1146,7 +1171,7 @@ class ShardFunctions {
* @param {string[]} exclude
* @param {Node[]} activeNodes
*/
static getNodesThatCoverParitionRaw(shardGlobals: StateManager.shardFunctionTypes.ShardGlobals, nodeShardDataMap: Map<string, StateManager.shardFunctionTypes.NodeShardData>, partition: number, exclude: string[], activeNodes: Shardus.Node[]): Shardus.Node[] {
static getNodesThatCoverPartitionRaw(shardGlobals: StateManager.shardFunctionTypes.ShardGlobals, nodeShardDataMap: Map<string, StateManager.shardFunctionTypes.NodeShardData>, partition: number, exclude: string[], activeNodes: Shardus.Node[]): Shardus.Node[] {
let results = [] as Shardus.Node[]

// TODO perf. (may be tricky to improve), should probably be part of a comprehensive set of improvements that consider networks with millions of nodes
Expand Down Expand Up @@ -1175,6 +1200,95 @@ class ShardFunctions {
return results
}

/**
* NOTE this is a raw answer. edge cases with consensus node coverage can increase the results of our raw answer that is given here
* @param {StateManager.shardFunctionTypes.ShardGlobals} shardGlobals
* @param {Map<string, StateManager.shardFunctionTypes.NodeShardData>} nodeShardDataMap
* @param {number} partition
* @param {string[]} exclude
* @param {Node[]} activeNodes
*/
static getNodesThatCoverHomePartition(shardGlobals: StateManager.shardFunctionTypes.ShardGlobals, thisNode: StateManager.shardFunctionTypes.NodeShardData, nodeShardDataMap: Map<string, StateManager.shardFunctionTypes.NodeShardData>, activeNodes: Shardus.Node[]): Shardus.Node[] {
let results = [] as Shardus.Node[]
let homePartition = thisNode.homePartition

let searchRight = true
let index = homePartition
//let failCount = 0
let startIndex = index
let once = false
while (searchRight) {
// if (failCount > 1) {
// searchRight = false
// break
// }

if (index >= activeNodes.length) {
index = 0
}
//check for a complete wrap if we have full coverage
//this only happens when there is no sharding yet. numnodes *2 = consenus range (approx)
//we could break this logic out above in a separate check but I think this is ok for now (not too costly)
if(startIndex === index && once){
return results
}
once=true

let node = activeNodes[index]
index++

if(node == thisNode.node)
continue

let nodeShardData = nodeShardDataMap.get(node.id)
if (nodeShardData == null) {
continue
}
if (nodeShardData.storedPartitions == null) {
nodeShardData.storedPartitions = ShardFunctions.calculateStoredPartitions2(shardGlobals, nodeShardData.homePartition)
}
if (ShardFunctions.testInRange(homePartition, nodeShardData.storedPartitions) !== true) {
//failCount++
break
}
results.push(node)
}
let searchLeft = true
index = homePartition
//failCount = 0
while (searchLeft) {
// if (failCount > 2) {
// searchLeft = false
// break
// }

if (index < 0) {
index = activeNodes.length - 1
}
let node = activeNodes[index]
index--

if(node == thisNode.node)
continue


let nodeShardData = nodeShardDataMap.get(node.id)
if (nodeShardData == null) {
continue
}
if (nodeShardData.storedPartitions == null) {
nodeShardData.storedPartitions = ShardFunctions.calculateStoredPartitions2(shardGlobals, nodeShardData.homePartition)
}
if (ShardFunctions.testInRange(homePartition, nodeShardData.storedPartitions) !== true) {
//failCount++
break
}
results.push(node)
}
return results
}


/**
* getNeigborNodesInRange
* get nodes in count range to either side of our node
Expand Down
3 changes: 3 additions & 0 deletions test/unit/state-manager/shardCalculation2.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const crypto = require('shardus-crypto-utils')
const utils = require('../../../build/src/utils')
crypto.init('69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc')

// const Profiler = require('../../../src/utils/profiler.js')
// let profiler = new Profiler()

let maxLogs = 100

let errorsLogged = 0
Expand Down
Loading

0 comments on commit c1494e0

Please sign in to comment.