Skip to content

Commit

Permalink
change the network topology to gossip mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vanessaviolet committed Feb 5, 2024
1 parent 2aaa76d commit b8e3ddc
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 476 deletions.
41 changes: 6 additions & 35 deletions config/config.example.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
[node]
# the private spend key of the signer
signer-key = "8bcfad3959892e8334fa287a3c9755fed017cd7a9e8c68d7540dc9e69fa4a00d"
# limit the peers that can establish a connection and exchange snapshots
consensus-only = false
# the period in seconds to check some mint and election kernel opportunities
kernel-operation-period = 700
# the maximum cache size in MB
Expand All @@ -20,43 +18,16 @@ value-log-gc = true
max-compaction-levels = 7

[network]
# the public endpoint to receive peer packets, may be a proxy or load balancer
# must be a public reachable domain or IP, and the port allowed by firewall
listener = "mixin-node.example.com:7239"
# whether to gossip known neighbors to neighbors, and to connect neighbors gossiped
# by neighbors
gossip-neighbors = true
# a relayer needs a public address to listen and relay messages to other nodes
# a signer should set this value to false for security
relayer = false
# metric different message types sent and received
metric = false
# the nodes list
peers = [
"new-mixin-node0.exinpool.com:7239",
"new-mixin-node1.exinpool.com:7239",
"new-mixin-node2.exinpool.com:7239",
"new-mixin-node3.exinpool.com:7239",
"mixin-node-lehigh-1.hotot.org:7239",
"mixin-node-lehigh-2.hotot.org:7239",
"mixin-node-42.f1ex.io:7239",
"mixin-node-fes.f1ex.io:7239",
"mixin-node-box-1.b.watch:7239",
"mixin-node-box-2.b.watch:7239",
"mixin-node-box-3.b.watch:7239",
"mixin-node-box-4.b.watch:7239",
"mixin-node-okashi.mixin.fan:7239",
"mixin-node1.b1.run:7239",
"mixin-node2.b1.run:7239",
"mixin-node3.b1.run:7239",
"mixin-node4.b1.run:7239",
"mixin-node6.b1.run:7239",
"mixin-node7.b1.run:7239",
"mixin-node8b.b1.run:7239",
"34.42.197.136:7239",
"13.51.72.77:7239",
"3.227.254.217:7239",
"44.197.199.140:7239",
"16.170.250.120:7239",
"13.51.169.35:7239",
"43.206.154.20:7239"
"b798bca3d9aa61fed4cf39dee1e9b4317daf64a6e31cd433ac684d629f896b46@new-mixin-node0.exinpool.com:7239",
"3dae767383bd0eee3ff338e8d1e89577e4f3ece4b98662cb151f22d6c474407b@mixin-node-42.f1ex.io:7239",
"5e7ca75239ff68231bd0bcebc8be5b4725e8784b4df8788306c9baa291ec8595@mixin-node1.b1.run:7239"
]

[rpc]
Expand Down
8 changes: 3 additions & 5 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Custom struct {
Node struct {
Signer crypto.Key `toml:"-"`
SignerStr string `toml:"signer-key"`
ConsensusOnly bool `toml:"consensus-only"`
KernelOprationPeriod int `toml:"kernel-operation-period"`
MemoryCacheSize int `toml:"memory-cache-size"`
CacheTTL int `toml:"cache-ttl"`
Expand All @@ -51,10 +50,9 @@ type Custom struct {
MaxCompactionLevels int `toml:"max-compaction-levels"`
} `toml:"storage"`
Network struct {
Listener string `toml:"listener"`
GossipNeighbors bool `toml:"gossip-neighbors"`
Metric bool `toml:"metric"`
Peers []string `toml:"peers"`
Relayer bool `toml:"relayer"`
Metric bool `toml:"metric"`
Peers []string `toml:"peers"`
} `toml:"network"`
RPC struct {
Runtime bool `toml:"runtime"`
Expand Down
13 changes: 4 additions & 9 deletions kernel/boot.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package kernel

import (
"fmt"
"time"

"github.com/MixinNetwork/mixin/kernel/internal/clock"
)

func (node *Node) Loop() error {
err := node.PingNeighborsFromConfig()
err := node.addRelayersFromConfig()
if err != nil {
return err
}
go func() {
err := node.ListenNeighbors()
if err != nil {
panic(fmt.Errorf("ListenNeighbors %s", err.Error()))
}
}()
go node.LoopCacheQueue()
go node.listenConsumers()
go node.sendGraphToConcensusNodes()
go node.loopCacheQueue()
go node.MintLoop()
node.ElectionLoop()
return nil
Expand Down
3 changes: 2 additions & 1 deletion kernel/cosi.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ func (chain *Chain) cosiHandleResponse(m *CosiAction) error {
logger.Verbosef("cosiHandleResponse %v AGGREGATE ERROR\n", m)
return nil
}
logger.Verbosef("node.cacheVerifyCosi(%s, %s) FINAL\n", chain.node.Peer.Address, m.SnapshotHash)

if chain.IsPledging() && s.RoundNumber == 0 && cd.TX.TransactionType() == common.TransactionTypeNodeAccept {
err := chain.node.finalizeNodeAcceptSnapshot(s, signers)
Expand Down Expand Up @@ -1007,7 +1008,7 @@ func (node *Node) CosiAggregateSelfResponses(peerId crypto.Hash, snap crypto.Has
func (node *Node) VerifyAndQueueAppendSnapshotFinalization(peerId crypto.Hash, s *common.Snapshot) error {
s.Hash = s.PayloadHash()
logger.Debugf("VerifyAndQueueAppendSnapshotFinalization(%s, %s)\n", peerId, s.Hash)
if node.custom.Node.ConsensusOnly && node.GetAcceptedOrPledgingNode(peerId) == nil {
if node.GetAcceptedOrPledgingNode(peerId) == nil {
logger.Verbosef("VerifyAndQueueAppendSnapshotFinalization(%s, %s) invalid consensus peer\n",
peerId, s.Hash)
return nil
Expand Down
11 changes: 2 additions & 9 deletions kernel/genesis.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package kernel

import (
"github.com/MixinNetwork/mixin/common"
)

func (node *Node) LoadGenesis(configDir string) error {
gns, err := common.ReadGenesis(configDir + "/genesis.json")
if err != nil {
return err
}
import "github.com/MixinNetwork/mixin/common"

func (node *Node) LoadGenesis(gns *common.Genesis) error {
node.Epoch = gns.EpochTimestamp()
node.networkId = gns.NetworkId()
node.IdForNetwork = node.Signer.Hash().ForNetwork(node.networkId)
Expand Down
93 changes: 57 additions & 36 deletions kernel/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand All @@ -20,7 +21,7 @@ import (
type Node struct {
IdForNetwork crypto.Hash
Signer common.Address
Listener string
isRelayer bool

Peer *network.Peer
TopoCounter *TopologicalSequence
Expand Down Expand Up @@ -68,7 +69,7 @@ type CNode struct {
ConsensusIndex int
}

func SetupNode(custom *config.Custom, persistStore storage.Store, cacheStore *ristretto.Cache, addr string, dir string) (*Node, error) {
func SetupNode(custom *config.Custom, persistStore storage.Store, cacheStore *ristretto.Cache, addr, dir string) (*Node, error) {
var node = &Node{
SyncPoints: &syncMap{mutex: new(sync.RWMutex), m: make(map[crypto.Hash]*network.SyncPoint)},
chains: &chainsMap{m: make(map[crypto.Hash]*Chain)},
Expand All @@ -90,7 +91,11 @@ func SetupNode(custom *config.Custom, persistStore storage.Store, cacheStore *ri
mint := node.lastMintDistribution()
node.LastMint = mint.Batch

err := node.LoadGenesis(dir)
gns, err := common.ReadGenesis(dir + "/genesis.json")
if err != nil {
return nil, fmt.Errorf("ReadGenesis(%s) => %v", dir, err)
}
err = node.LoadGenesis(gns)
if err != nil {
return nil, fmt.Errorf("LoadGenesis(%s) => %v", dir, err)
}
Expand Down Expand Up @@ -132,7 +137,7 @@ func (node *Node) loadNodeConfig() {
addr.PrivateViewKey = addr.PublicSpendKey.DeterministicHashDerive()
addr.PublicViewKey = addr.PrivateViewKey.Public()
node.Signer = addr
node.Listener = node.custom.Network.Listener
node.isRelayer = node.custom.Network.Relayer
}

func (node *Node) buildNodeStateSequences(allNodesSortedWithState []*CNode, acceptedOnly bool) []*NodeStateSequence {
Expand Down Expand Up @@ -326,31 +331,34 @@ func (node *Node) NewTransaction(assetId crypto.Hash) *common.Transaction {
return common.NewTransactionV5(assetId)
}

func (node *Node) PingNeighborsFromConfig() error {
gossip, metric := node.custom.Network.GossipNeighbors, node.custom.Network.Metric
node.Peer = network.NewPeer(node, node.IdForNetwork, node.addr, gossip, metric)
func (node *Node) addRelayersFromConfig() error {
node.Peer = network.NewPeer(node, node.IdForNetwork, node.addr, node.isRelayer)

for _, s := range node.custom.Network.Peers {
if s == node.Listener {
continue
parts := strings.Split(s, "@")
if len(parts) != 2 {
return fmt.Errorf("invalid peer %s", s)
}
node.Peer.PingNeighbor(s)
}
return nil
}

func (node *Node) UpdateNeighbors(neighbors []string) error {
for _, in := range neighbors {
if in == node.Listener {
nid, err := crypto.HashFromString(parts[0])
if err != nil {
return fmt.Errorf("invalid peer id %s", s)
}
if nid == node.IdForNetwork {
continue
}
node.Peer.PingNeighbor(in)
go node.Peer.ConnectRelayer(nid, parts[1])
}
return nil
}

func (node *Node) ListenNeighbors() error {
return node.Peer.ListenNeighbors()
func (node *Node) listenConsumers() {
if !node.isRelayer {
return
}
err := node.Peer.ListenConsumers()
if err != nil {
panic(err)
}
}

func (node *Node) NetworkId() crypto.Hash {
Expand Down Expand Up @@ -392,45 +400,45 @@ func (node *Node) BuildAuthenticationMessage() []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, uint64(clock.Now().Unix()))
data = append(data, node.Signer.PublicSpendKey[:]...)
if node.isRelayer {
data = append(data, 1)
} else {
data = append(data, 0)
}
dh := crypto.Blake3Hash(data)
sig := node.Signer.PrivateSpendKey.Sign(dh)
data = append(data, sig[:]...)
return append(data, []byte(node.Listener)...)
return data
}

func (node *Node) Authenticate(msg []byte) (crypto.Hash, string, error) {
if len(msg) < 8+len(crypto.Hash{})+len(crypto.Signature{}) {
return crypto.Hash{}, "", fmt.Errorf("peer authentication message malformated %d", len(msg))
func (node *Node) Authenticate(msg []byte) (crypto.Hash, bool, error) {
if len(msg) != 105 {
return crypto.Hash{}, false, fmt.Errorf("peer authentication message malformated %d", len(msg))
}
ts := binary.BigEndian.Uint64(msg[:8])
if clock.Now().Unix()-int64(ts) > 3 {
return crypto.Hash{}, "", fmt.Errorf("peer authentication message timeout %d %d", ts, clock.Now().Unix())
return crypto.Hash{}, false, fmt.Errorf("peer authentication message timeout %d %d", ts, clock.Now().Unix())
}

var signer common.Address
copy(signer.PublicSpendKey[:], msg[8:40])
signer.PublicViewKey = signer.PublicSpendKey.DeterministicHashDerive().Public()
peerId := signer.Hash().ForNetwork(node.networkId)
if peerId == node.IdForNetwork {
return crypto.Hash{}, "", fmt.Errorf("peer authentication invalid consensus peer %s", peerId)
return crypto.Hash{}, false, fmt.Errorf("peer is self %s", peerId)
}
peer := node.GetAcceptedOrPledgingNode(peerId)

if node.custom.Node.ConsensusOnly && peer == nil {
return crypto.Hash{}, "", fmt.Errorf("peer authentication invalid consensus peer %s", peerId)
}
if peer != nil && peer.Signer.Hash() != signer.Hash() {
return crypto.Hash{}, "", fmt.Errorf("peer authentication invalid consensus peer %s", peerId)
return crypto.Hash{}, false, fmt.Errorf("peer authentication invalid consensus peer %s", peerId)
}

var sig crypto.Signature
copy(sig[:], msg[40:40+len(sig)])
mh := crypto.Blake3Hash(msg[:40])
copy(sig[:], msg[41:105])
mh := crypto.Blake3Hash(msg[:41])
if !signer.PublicSpendKey.Verify(mh, sig) {
return crypto.Hash{}, "", fmt.Errorf("peer authentication message signature invalid %s", peerId)
return crypto.Hash{}, false, fmt.Errorf("peer authentication message signature invalid %s", peerId)
}

listener := string(msg[40+len(sig):])
listener := msg[40] == byte(1)
return peerId, listener, nil
}

Expand Down Expand Up @@ -463,6 +471,19 @@ func (node *Node) ReadSnapshotsForNodeRound(nodeIdWithNetwork crypto.Hash, round
return node.persistStore.ReadSnapshotsForNodeRound(nodeIdWithNetwork, round)
}

func (node *Node) sendGraphToConcensusNodes() {
graphTicker := time.NewTicker(time.Duration(config.SnapshotRoundGap / 2))
defer graphTicker.Stop()

for {
nodes := node.NodesListWithoutState(uint64(clock.Now().UnixNano()), true)
for _, cn := range nodes {
node.Peer.SendGraphMessage(cn.IdForNetwork)
}
<-graphTicker.C
}
}

func (node *Node) UpdateSyncPoint(peerId crypto.Hash, points []*network.SyncPoint) {
for _, p := range points {
if p.NodeId == node.IdForNetwork {
Expand Down
2 changes: 1 addition & 1 deletion kernel/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (node *Node) QueueTransaction(tx *common.VersionedTransaction) (string, err
return tx.PayloadHash().String(), err
}

func (node *Node) LoopCacheQueue() error {
func (node *Node) loopCacheQueue() error {
defer close(node.cqc)

for {
Expand Down
Loading

0 comments on commit b8e3ddc

Please sign in to comment.