diff --git a/go.mod b/go.mod index 82b5fbdc9..246b80eba 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/MixinNetwork/mixin -go 1.22.4 +go 1.22.5 replace github.com/dgraph-io/badger/v4 => github.com/MixinNetwork/badger/v4 v4.2.0-F1 diff --git a/p2p/handle.go b/p2p/handle.go index d920763a7..f3d66e220 100644 --- a/p2p/handle.go +++ b/p2p/handle.go @@ -464,9 +464,9 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage return nil } rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...)) - success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data}) + success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data}) if !success { - logger.Verbosef("me.offerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork) + logger.Verbosef("me.offerToPeerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork) } } return nil diff --git a/p2p/peer.go b/p2p/peer.go index 1e3eebc26..50bc21bd6 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "encoding/binary" "fmt" "net" "slices" @@ -13,7 +12,6 @@ import ( "github.com/MixinNetwork/mixin/crypto" "github.com/MixinNetwork/mixin/logger" "github.com/MixinNetwork/mixin/util" - "github.com/dgraph-io/ristretto" ) type Peer struct { @@ -399,7 +397,7 @@ func (me *Peer) sendHighToPeer(to crypto.Hash, typ byte, key, data []byte) error return me.sendToPeer(to, typ, key, data, MsgPriorityHigh) } -func (me *Peer) offerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) bool { +func (me *Peer) offerToPeerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) bool { if p.IdForNetwork == me.IdForNetwork { return true } @@ -451,9 +449,9 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority panic(peer.IdForNetwork) } rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...)) - success := peer.offer(priority, &ChanMsg{rk[:], rm}) + success := me.offerToPeerWithCacheCheck(peer, priority, &ChanMsg{rk[:], rm}) if !success { - logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork) + logger.Verbosef("me.offerToPeerWithCacheCheck(%s) send timeout\n", peer.IdForNetwork) } } return nil @@ -465,41 +463,6 @@ func (me *Peer) sendSnapshotMessageToPeer(to crypto.Hash, snap crypto.Hash, typ return me.sendToPeer(to, typ, key, data, MsgPriorityNormal) } -type confirmMap struct { - cache *ristretto.Cache -} - -func (m *confirmMap) contains(key []byte, duration time.Duration) bool { - if key == nil { - return false - } - val, found := m.cache.Get(key) - if found { - ts := time.Unix(0, int64(binary.BigEndian.Uint64(val.([]byte)))) - return ts.Add(duration).After(time.Now()) - } - return false -} - -func (m *confirmMap) store(key []byte, ts time.Time) { - if key == nil { - panic(ts) - } - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(ts.UnixNano())) - m.cache.Set(key, buf, 8) -} - -type remoteRelayer struct { - Id crypto.Hash - ActiveAt time.Time -} - -type relayersMap struct { - sync.RWMutex - m map[crypto.Hash][]*remoteRelayer -} - func (me *Peer) GetNeighbor(key crypto.Hash) *Peer { p := me.relayers.Get(key) if p != nil { @@ -522,112 +485,3 @@ func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer { } return relayers } - -func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash { - m.RLock() - defer m.RUnlock() - - var relayers []crypto.Hash - for _, r := range m.m[key] { - if r.ActiveAt.Add(time.Minute).Before(time.Now()) { - continue - } - relayers = append(relayers, r.Id) - } - return relayers -} - -func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) { - m.Lock() - defer m.Unlock() - - var relayers []*remoteRelayer - for _, r := range m.m[key] { - if r.ActiveAt.Add(time.Minute).After(time.Now()) { - relayers = append(relayers, r) - } - } - for _, r := range relayers { - if r.Id == v { - r.ActiveAt = time.Now() - return - } - } - i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool { - return r.Id == v - }) - if i < 0 { - relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v}) - } else { - relayers[i].ActiveAt = time.Now() - } - m.m[key] = relayers -} - -type neighborMap struct { - sync.RWMutex - m map[crypto.Hash]*Peer -} - -func (m *neighborMap) Get(key crypto.Hash) *Peer { - m.RLock() - defer m.RUnlock() - - return m.m[key] -} - -func (m *neighborMap) Delete(key crypto.Hash) { - m.Lock() - defer m.Unlock() - - delete(m.m, key) -} - -func (m *neighborMap) Set(key crypto.Hash, v *Peer) { - m.Lock() - defer m.Unlock() - - m.m[key] = v -} - -func (m *neighborMap) Put(key crypto.Hash, v *Peer) bool { - m.Lock() - defer m.Unlock() - - if m.m[key] != nil { - return false - } - m.m[key] = v - return true -} - -func (m *neighborMap) Slice() []*Peer { - m.Lock() - defer m.Unlock() - - var peers []*Peer - for _, p := range m.m { - peers = append(peers, p) - } - return peers -} - -func (m *neighborMap) Clear() { - m.Lock() - defer m.Unlock() - - for id := range m.m { - delete(m.m, id) - } -} - -func (m *neighborMap) RunOnce(key crypto.Hash, v *Peer, f func()) { - m.Lock() - defer m.Unlock() - - if m.m[key] != nil { - return - } - m.m[key] = v - go f() -} diff --git a/p2p/util.go b/p2p/util.go new file mode 100644 index 000000000..ced5e6170 --- /dev/null +++ b/p2p/util.go @@ -0,0 +1,155 @@ +package p2p + +import ( + "encoding/binary" + "slices" + "sync" + "time" + + "github.com/MixinNetwork/mixin/crypto" + "github.com/dgraph-io/ristretto" +) + +type confirmMap struct { + cache *ristretto.Cache +} + +func (m *confirmMap) contains(key []byte, duration time.Duration) bool { + if key == nil { + return false + } + val, found := m.cache.Get(key) + if found { + ts := time.Unix(0, int64(binary.BigEndian.Uint64(val.([]byte)))) + return ts.Add(duration).After(time.Now()) + } + return false +} + +func (m *confirmMap) store(key []byte, ts time.Time) { + if key == nil { + panic(ts) + } + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(ts.UnixNano())) + m.cache.Set(key, buf, 8) +} + +type remoteRelayer struct { + Id crypto.Hash + ActiveAt time.Time +} + +type relayersMap struct { + sync.RWMutex + m map[crypto.Hash][]*remoteRelayer +} + +func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash { + m.RLock() + defer m.RUnlock() + + var relayers []crypto.Hash + for _, r := range m.m[key] { + if r.ActiveAt.Add(time.Minute).Before(time.Now()) { + continue + } + relayers = append(relayers, r.Id) + } + return relayers +} + +func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) { + m.Lock() + defer m.Unlock() + + var relayers []*remoteRelayer + for _, r := range m.m[key] { + if r.ActiveAt.Add(time.Minute).After(time.Now()) { + relayers = append(relayers, r) + } + } + for _, r := range relayers { + if r.Id == v { + r.ActiveAt = time.Now() + return + } + } + i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool { + return r.Id == v + }) + if i < 0 { + relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v}) + } else { + relayers[i].ActiveAt = time.Now() + } + m.m[key] = relayers +} + +type neighborMap struct { + sync.RWMutex + m map[crypto.Hash]*Peer +} + +func (m *neighborMap) Get(key crypto.Hash) *Peer { + m.RLock() + defer m.RUnlock() + + return m.m[key] +} + +func (m *neighborMap) Delete(key crypto.Hash) { + m.Lock() + defer m.Unlock() + + delete(m.m, key) +} + +func (m *neighborMap) Set(key crypto.Hash, v *Peer) { + m.Lock() + defer m.Unlock() + + m.m[key] = v +} + +func (m *neighborMap) Put(key crypto.Hash, v *Peer) bool { + m.Lock() + defer m.Unlock() + + if m.m[key] != nil { + return false + } + m.m[key] = v + return true +} + +func (m *neighborMap) Slice() []*Peer { + m.Lock() + defer m.Unlock() + + var peers []*Peer + for _, p := range m.m { + peers = append(peers, p) + } + return peers +} + +func (m *neighborMap) Clear() { + m.Lock() + defer m.Unlock() + + for id := range m.m { + delete(m.m, id) + } +} + +func (m *neighborMap) RunOnce(key crypto.Hash, v *Peer, f func()) { + m.Lock() + defer m.Unlock() + + if m.m[key] != nil { + return + } + m.m[key] = v + go f() +}