diff --git a/go.mod b/go.mod index 0541b7c012..c7c274acbc 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/decred/dcrd/gcs/v4 v4.0.0 github.com/decred/dcrd/lru v1.1.2 github.com/decred/dcrd/math/uint256 v1.0.1 + github.com/decred/dcrd/mixing v0.0.1 github.com/decred/dcrd/peer/v3 v3.0.2 github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.0.0 github.com/decred/dcrd/rpcclient/v8 v8.0.0 @@ -38,18 +39,20 @@ require ( github.com/jrick/bitset v1.0.0 github.com/jrick/logrotate v1.0.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - golang.org/x/sys v0.8.0 - golang.org/x/term v0.5.0 + golang.org/x/sys v0.20.0 + golang.org/x/term v0.20.0 lukechampine.com/blake3 v1.3.0 ) require ( github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect + github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect github.com/dchest/siphash v1.2.3 // indirect github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/decred/dcrd/hdkeychain/v3 v3.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect + golang.org/x/crypto v0.23.0 // indirect ) replace ( @@ -75,6 +78,7 @@ replace ( github.com/decred/dcrd/limits => ./limits github.com/decred/dcrd/lru => ./lru github.com/decred/dcrd/math/uint256 => ./math/uint256 + github.com/decred/dcrd/mixing => ./mixing github.com/decred/dcrd/peer/v3 => ./peer github.com/decred/dcrd/rpc/jsonrpc/types/v4 => ./rpc/jsonrpc/types github.com/decred/dcrd/rpcclient/v8 => ./rpcclient diff --git a/go.sum b/go.sum index 084039147a..28f945bf4b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ +decred.org/cspp/v2 v2.2.0 h1:VSOUC1w0Wo+QOGS0r1XO6TLnO16X67KuvpDmRRYyr08= +decred.org/cspp/v2 v2.2.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= @@ -53,11 +57,13 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -69,14 +75,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= diff --git a/internal/blockchain/chain.go b/internal/blockchain/chain.go index 1c897e78ef..8a36c8763e 100644 --- a/internal/blockchain/chain.go +++ b/internal/blockchain/chain.go @@ -297,6 +297,11 @@ type BlockChain struct { bulkImportMode bool } +// ChainParams returns the chain parameters. +func (b *BlockChain) ChainParams() *chaincfg.Params { + return b.chainParams +} + const ( // stakeMajorityCacheKeySize is comprised of the stake version and the // hash size. The stake version is a little endian uint32, hence we diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 5287c4ff6f..503bedf945 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -1084,6 +1084,22 @@ func (mp *TxPool) fetchInputUtxos(tx *dcrutil.Tx, isTreasuryEnabled bool) (*bloc return utxoView, nil } +// IsSpent returns whether the outpoint is spent by any transient or staged +// transaction in the tx pool. Outpoints that are not spent by transactions +// in the pool may not exist or may be spent in blocks. +func (mp *TxPool) IsSpent(outpoint wire.OutPoint) bool { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + _, exists := mp.outpoints[outpoint] + if exists { + return true + } + + _, exists = mp.stagedOutpoints[outpoint] + return exists +} + // FetchTransaction returns the requested transaction from the transaction pool. // This only fetches from the main and stage transaction pools and does not // include orphans. diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go index 7a930550a8..cad185c62c 100644 --- a/internal/netsync/interface.go +++ b/internal/netsync/interface.go @@ -6,6 +6,7 @@ package netsync import ( "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" ) // PeerNotifier provides an interface to notify peers of status changes related @@ -14,4 +15,8 @@ type PeerNotifier interface { // AnnounceNewTransactions generates and relays inventory vectors and // notifies websocket clients of the passed transactions. AnnounceNewTransactions(txns []*dcrutil.Tx) + + // AnnounceMixMessage generates and relays inventory vectors of the + // passed messages. + AnnounceMixMessages(msgs []mixing.Message) } diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 975ee32edf..9c8f5d0d6a 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -23,6 +23,8 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/progresslog" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" peerpkg "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -64,6 +66,14 @@ const ( maxRejectedTxns = 62500 rejectedTxnsFPRate = 0.0000001 + // maxRejectedMixMsgs specifies the maximum number of recently + // rejected mixing messages to track, and rejectedMixMsgsFPRate is the + // false positive rate for the APBF. These values have not been tuned + // specifically for the mixing messages, and the equivalent constants + // for handling rejected transactions are used. + maxRejectedMixMsgs = maxRejectedTxns + rejectedMixMsgsFPRate = rejectedTxnsFPRate + // maxRequestedBlocks is the maximum number of requested block // hashes to store in memory. maxRequestedBlocks = wire.MaxInvPerMsg @@ -72,6 +82,10 @@ const ( // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + // maxRequestedMixMsgs is the maximum number of hashes of in-flight + // mixing messages. + maxRequestedMixMsgs = wire.MaxInvPerMsg + // maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in // a single message that is expected when determining when the message // appears to be announcing new blocks. @@ -153,6 +167,7 @@ type requestFromPeerMsg struct { blocks []chainhash.Hash voteHashes []chainhash.Hash tSpendHashes []chainhash.Hash + mixHashes []chainhash.Hash reply chan requestFromPeerResponse } @@ -179,15 +194,24 @@ type processBlockMsg struct { reply chan processBlockResponse } +// mixMsg is a message type to be sent across the message channel for requesting +// a message's acceptance to the mixing pool. +type mixMsg struct { + msg mixing.Message + peer *Peer + reply chan error +} + // Peer extends a common peer to maintain additional state needed by the sync // manager. The internals are intentionally unexported to create an opaque // type. type Peer struct { *peerpkg.Peer - syncCandidate bool - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + syncCandidate bool + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} // initialStateRequested tracks whether or not the initial state data has // been requested from the peer. @@ -207,10 +231,11 @@ type Peer struct { func NewPeer(peer *peerpkg.Peer) *Peer { isSyncCandidate := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork return &Peer{ - Peer: peer, - syncCandidate: isSyncCandidate, - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + Peer: peer, + syncCandidate: isSyncCandidate, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), } } @@ -291,13 +316,15 @@ type SyncManager struct { // time. minKnownWork *uint256.Uint256 - rejectedTxns *apbf.Filter - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} - progressLogger *progresslog.Logger - syncPeer *Peer - msgChan chan interface{} - peers map[*Peer]struct{} + rejectedTxns *apbf.Filter + rejectedMixMsgs *apbf.Filter + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} + progressLogger *progresslog.Logger + syncPeer *Peer + msgChan chan interface{} + peers map[*Peer]struct{} // hdrSyncState houses the state used to track the initial header sync // process and related stall handling. @@ -666,6 +693,22 @@ BlockHashes: // No peers found that have announced this data. delete(m.requestedBlocks, blockHash) } + inv.Type = wire.InvTypeMix +MixHashes: + for mixHash := range peer.requestedMixMsgs { + inv.Hash = mixHash + for pp := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + invs := append(requestQueues[pp], inv) + requestQueues[pp] = invs + pp.requestedMixMsgs[mixHash] = struct{}{} + continue MixHashes + } + // No peers found that have announced this data. + delete(m.requestedMixMsgs, mixHash) + } for pp, requestQueue := range requestQueues { var numRequested int32 gdmsg := wire.NewMsgGetData() @@ -754,6 +797,66 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) { m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } +// handleMixMsg handles mixing messages from all peers. +func (m *SyncManager) handleMixMsg(mmsg *mixMsg) error { + peer := mmsg.peer + + mixHash := mmsg.msg.Hash() + + // Ignore transactions that have already been rejected. The transaction was + // unsolicited if it was already previously rejected. + if m.rejectedMixMsgs.Contains(mixHash[:]) { + log.Debugf("Ignoring unsolicited previously rejected mix message %v "+ + "from %s", &mixHash, peer) + return nil + } + + accepted, err := m.cfg.MixPool.AcceptMessage(mmsg.msg) + + // Remove message from request maps. Either the mixpool already knows + // about it and as such we shouldn't have any more instances of trying + // to fetch it, or we failed to insert and thus we'll retry next time + // we get an inv. + delete(peer.requestedMixMsgs, mixHash) + delete(m.requestedMixMsgs, mixHash) + + if err != nil { + // Do not request this message again until a new block has + // been processed. If the message is an orphan KE, it is + // tracked internally by mixpool as an orphan; there is no + // need to request it again after requesting the unknown PR. + m.rejectedMixMsgs.Add(mixHash[:]) + + // When the error is a rule error, it means the message was + // simply rejected as opposed to something actually going wrong, + // so log it as such. + // + // When the error is an orphan KE with unknown PR, the PR will be + // requested from the peer submitting the KE. This is a normal + // occurrence, and will be logged at debug instead at error level. + // + // Otherwise, something really did go wrong, so log it as an + // actual error. + var rErr *mixpool.RuleError + var missingPRErr *mixpool.MissingOwnPRError + if errors.As(err, &rErr) || errors.As(err, &missingPRErr) { + log.Debugf("Rejected %T mixing message %v from %s: %v", + mmsg.msg, &mixHash, peer, err) + } else { + log.Errorf("Failed to process %T mixing message %v: %v", + mmsg.msg, &mixHash, err) + } + return err + } + + if len(accepted) == 0 { + return nil + } + + m.cfg.PeerNotifier.AnnounceMixMessages(accepted) + return nil +} + // maybeUpdateIsCurrent potentially updates the manager to signal it believes // the chain is considered synced. // @@ -903,6 +1006,12 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { // Clear the rejected transactions. m.rejectedTxns.Reset() + + // Remove expired pair requests and completed mixes from + // mixpool. + m.cfg.MixPool.RemoveSpentPRs(msgBlock.Transactions) + m.cfg.MixPool.RemoveSpentPRs(msgBlock.STransactions) + m.cfg.MixPool.ExpireMessagesInBackground(header.Height) } // Update the latest block height for the peer to avoid stale heights when @@ -1275,6 +1384,24 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { return true } +// needMixMsg returns whether or not the mixing message needs to be downloaded. +func (m *SyncManager) needMixMsg(hash *chainhash.Hash) bool { + if m.rejectedMixMsgs.Contains(hash[:]) { + return false + } + + if m.cfg.MixPool.HaveMessage(hash) { + return false + } + + // TODO: It would be ideal here to not 'need' previously-observed + // messages that are known/expected to fail validation, or messages + // that have already been removed from mixpool. An LRU of recently + // removed mixpool messages may work well. + + return true +} + // handleInvMsg handles inv messages from all peers. This entails examining the // inventory advertised by the remote peer for block and transaction // announcements and acting accordingly. @@ -1332,6 +1459,29 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns) requestQueue = append(requestQueue, iv) } + + case wire.InvTypeMix: + // Add the mix message to the cache of known inventory + // for the peer. This helps avoid sending mix messages + // to the peer that it is already known to have. + peer.AddKnownInventory(iv) + + // Ignore mixing messages before the chain is current or + // if the messages are not needed. Pair request (PR) + // messages reference unspent outputs that must be + // checked to exist and be unspent before they are + // accepted, and all later messages must reference an + // existing PR recorded in the mixing pool. + if !isCurrent || !m.needMixMsg(&iv.Hash) { + continue + } + + // Request the mixing message if it is not already pending. + if _, exists := m.requestedMixMsgs[iv.Hash]; !exists { + limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + requestQueue = append(requestQueue, iv) + } } } @@ -1420,6 +1570,13 @@ out: case <-ctx.Done(): } + case *mixMsg: + err := m.handleMixMsg(msg) + select { + case msg.reply <- err: + case <-ctx.Done(): + } + case *invMsg: m.handleInvMsg(msg) @@ -1441,7 +1598,7 @@ out: case requestFromPeerMsg: err := m.requestFromPeer(msg.peer, msg.blocks, msg.voteHashes, - msg.tSpendHashes) + msg.tSpendHashes, msg.mixHashes) msg.reply <- requestFromPeerResponse{ err: err, } @@ -1538,6 +1695,15 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *Peer) { } } +// QueueMixMsg adds the passed mixing message and peer to the event handling +// queue. +func (m *SyncManager) QueueMixMsg(msg mixing.Message, peer *Peer, done chan error) { + select { + case m.msgChan <- &mixMsg{msg: msg, peer: peer, reply: done}: + case <-m.quit: + } +} + // QueueNotFound adds the passed notfound message and peer to the event handling // queue. func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *Peer) { @@ -1575,7 +1741,7 @@ func (m *SyncManager) SyncPeerID() int32 { // from a peer. The requests are logged in the internal map of requests so the // peer is not later banned for sending the respective data. func (m *SyncManager) RequestFromPeer(p *Peer, blocks, voteHashes, - tSpendHashes []chainhash.Hash) error { + tSpendHashes, mixHashes []chainhash.Hash) error { reply := make(chan requestFromPeerResponse, 1) request := requestFromPeerMsg{ @@ -1583,6 +1749,7 @@ func (m *SyncManager) RequestFromPeer(p *Peer, blocks, voteHashes, blocks: blocks, voteHashes: voteHashes, tSpendHashes: tSpendHashes, + mixHashes: mixHashes, reply: reply, } select { @@ -1599,7 +1766,7 @@ func (m *SyncManager) RequestFromPeer(p *Peer, blocks, voteHashes, } func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, - tSpendHashes []chainhash.Hash) error { + tSpendHashes, mixHashes []chainhash.Hash) error { // Add the blocks to the request. msgResp := wire.NewMsgGetData() @@ -1706,6 +1873,32 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, return err } + for i := range mixHashes { + // If we've already requested this mix message, skip it. + mh := &mixHashes[i] + _, alreadyReqP := peer.requestedMixMsgs[*mh] + _, alreadyReqB := m.requestedMixMsgs[*mh] + + if alreadyReqP || alreadyReqB { + continue + } + + // Skip the message when it is already known. + if m.cfg.MixPool.HaveMessage(mh) { + continue + } + + err := msgResp.AddInvVect(wire.NewInvVect(wire.InvTypeMix, mh)) + if err != nil { + return fmt.Errorf("unexpected error encountered building request "+ + "for inv vect mix hash %v: %v", + mh, err.Error()) + } + + peer.requestedMixMsgs[*mh] = struct{}{} + m.requestedMixMsgs[*mh] = struct{}{} + } + if len(msgResp.InvList) > 0 { peer.QueueMessage(msgResp, nil) } @@ -1803,6 +1996,10 @@ type Config struct { // and querying the most recently confirmed transactions. It is useful for // preventing duplicate requests. RecentlyConfirmedTxns *apbf.Filter + + // MixPool specifies the mixing pool to use for transient mixing + // messages broadcast across the network. + MixPool *mixpool.Pool } // New returns a new network chain synchronization manager. Use Run to begin @@ -1819,17 +2016,19 @@ func New(config *Config) *SyncManager { } return &SyncManager{ - cfg: *config, - rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peers: make(map[*Peer]struct{}), - minKnownWork: minKnownWork, - hdrSyncState: makeHeaderSyncState(), - progressLogger: progresslog.New("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - quit: make(chan struct{}), - syncHeight: config.Chain.BestSnapshot().Height, - isCurrent: config.Chain.IsCurrent(), + cfg: *config, + rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), + rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), + peers: make(map[*Peer]struct{}), + minKnownWork: minKnownWork, + hdrSyncState: makeHeaderSyncState(), + progressLogger: progresslog.New("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + quit: make(chan struct{}), + syncHeight: config.Chain.BestSnapshot().Height, + isCurrent: config.Chain.IsCurrent(), } } diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index 304ad83909..250e6ef3df 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -19,6 +19,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -129,6 +130,10 @@ type ConnManager interface { // the passed transactions to all connected peers. RelayTransactions(txns []*dcrutil.Tx) + // RelayMixMessages generates and relays inventory vectors for all of + // the passed mixing messages to all connected peers. + RelayMixMessages(msgs []mixing.Message) + // Lookup defines the DNS lookup function to be used. Lookup(host string) ([]net.IP, error) } @@ -162,6 +167,10 @@ type SyncManager interface { // // This method may report a false positive, but never a false negative. RecentlyConfirmedTxn(hash *chainhash.Hash) bool + + // SubmitMixMessage submits the mixing message to the network after + // processing it locally. + SubmitMixMessage(msg mixing.Message) error } // UtxoEntry represents a utxo entry for use with the RPC server. @@ -613,6 +622,22 @@ type TxMempooler interface { TSpendHashes() []chainhash.Hash } +// MixPooler represents a source of mixpool message data for the RPC server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type MixPooler interface { + // MixPRs returns all MixPR messages with hashes matching the query. + // Unknown messages are ignored. + // + // If query is nil, all PRs are returned. + MixPRs(query []chainhash.Hash) []*wire.MsgMixPairReq + + // RemoveConfirmedRuns removes all messages including pair requests + // from runs which ended in each peer sending a confirm mix message. + RemoveConfirmedRuns() +} + // TxIndexer provides an interface for retrieving details for a given // transaction hash. // @@ -671,6 +696,10 @@ type NtfnManager interface { // manager for processing. NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) + // NotifyMixMessage passes a mixing message accepted by the mixpool to the + // notification manager for message broadcasting. + NotifyMixMessage(msg mixing.Message) + // NumClients returns the number of clients actively being served. NumClients() int @@ -722,6 +751,14 @@ type NtfnManager interface { // client when new transaction are added to the memory pool. UnregisterNewMempoolTxsUpdates(wsc *wsClient) + // RegisterMixMessages requests notifications to the passed websocket + // client when new mixing messages are accepted by the mixpool. + RegisterMixMessages(wsc *wsClient) + + // UnregisterMixMessages stops notifications to the websocket client + // of any newly-accepted mixing messages. + UnregisterMixMessages(wsc *wsClient) + // AddClient adds the passed websocket client to the notification manager. AddClient(wsc *wsClient) diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 6a9c52330d..ff7e2c53ea 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -51,6 +51,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -61,7 +62,7 @@ import ( // API version constants const ( jsonrpcSemverMajor = 8 - jsonrpcSemverMinor = 1 + jsonrpcSemverMinor = 2 jsonrpcSemverPatch = 0 ) @@ -208,6 +209,7 @@ var rpcHandlersBeforeInit = map[types.Method]commandHandler{ "getinfo": handleGetInfo, "getmempoolinfo": handleGetMempoolInfo, "getmininginfo": handleGetMiningInfo, + "getmixpairrequests": handleGetMixPairRequests, "getnettotals": handleGetNetTotals, "getnetworkhashps": handleGetNetworkHashPS, "getnetworkinfo": handleGetNetworkInfo, @@ -231,6 +233,7 @@ var rpcHandlersBeforeInit = map[types.Method]commandHandler{ "ping": handlePing, "reconsiderblock": handleReconsiderBlock, "regentemplate": handleRegenTemplate, + "sendrawmixmessage": handleSendRawMixMessage, "sendrawtransaction": handleSendRawTransaction, "setgenerate": handleSetGenerate, "stop": handleStop, @@ -338,6 +341,7 @@ var rpcUnimplemented = map[string]struct{}{ var rpcLimited = map[string]struct{}{ // Websockets commands "notifyblocks": {}, + "notifymixmessages": {}, "notifynewtransactions": {}, "rescan": {}, "session": {}, @@ -375,6 +379,7 @@ var rpcLimited = map[string]struct{}{ "getdifficulty": {}, "getheaders": {}, "getinfo": {}, + "getmixpairrequests": {}, "getnettotals": {}, "getnetworkhashps": {}, "getnetworkinfo": {}, @@ -388,6 +393,7 @@ var rpcLimited = map[string]struct{}{ "getvoteinfo": {}, "livetickets": {}, "regentemplate": {}, + "sendrawmixmessage": {}, "sendrawtransaction": {}, "submitblock": {}, "ticketfeeinfo": {}, @@ -2556,6 +2562,30 @@ func handleGetMiningInfo(ctx context.Context, s *Server, _ interface{}) (interfa return &result, nil } +// handleGetMixPairRequests implements the getmixpairrequests command, +// returning all current mixing pair requests messages from mixpool. +func handleGetMixPairRequests(_ context.Context, s *Server, _ interface{}) (interface{}, error) { + mp := s.cfg.MixPooler + + mp.RemoveConfirmedRuns() // XXX: a bit hacky to do this here + prs := mp.MixPRs(nil) + + buf := new(strings.Builder) + res := make([]string, 0, len(prs)) + + const pver = wire.MixVersion + for _, pr := range prs { + err := pr.BtcEncode(hex.NewEncoder(buf), pver) + if err != nil { + return nil, err + } + res = append(res, buf.String()) + buf.Reset() + } + + return res, nil +} + // handleGetNetTotals implements the getnettotals command. func handleGetNetTotals(_ context.Context, s *Server, _ interface{}) (interface{}, error) { totalBytesRecv, totalBytesSent := s.cfg.ConnMgr.NetTotals() @@ -4224,6 +4254,65 @@ func handleRegenTemplate(_ context.Context, s *Server, _ interface{}) (interface return nil, nil } +// handleSendRawMixMessage implements the sendrawmixmessage command. +func handleSendRawMixMessage(_ context.Context, s *Server, icmd interface{}) (interface{}, error) { + c := icmd.(*types.SendRawMixMessageCmd) + + // Allocate a message of the appropriate type based on the wire + // command string. + var msg mixing.Message + switch c.Command { + case wire.CmdMixPairReq: + msg = new(wire.MsgMixPairReq) + case wire.CmdMixKeyExchange: + msg = new(wire.MsgMixKeyExchange) + case wire.CmdMixCiphertexts: + msg = new(wire.MsgMixCiphertexts) + case wire.CmdMixSlotReserve: + msg = new(wire.MsgMixSlotReserve) + case wire.CmdMixDCNet: + msg = new(wire.MsgMixDCNet) + case wire.CmdMixConfirm: + msg = new(wire.MsgMixConfirm) + case wire.CmdMixFactoredPoly: + msg = new(wire.MsgMixFactoredPoly) + case wire.CmdMixSecrets: + msg = new(wire.MsgMixSecrets) + default: + err := rpcInvalidError("Unrecognized mixing message "+ + "wire command string %q", c.Command) + return nil, err + } + + // Deserialize message. + err := msg.BtcDecode(hex.NewDecoder(strings.NewReader(c.Message)), + wire.MixVersion) + if err != nil { + return nil, rpcDeserializationError("Could not decode mix "+ + "message: %v", err) + } + + // Message hash of the freshly-deserialized mixing message needs to be + // pre-calculated. Calculate this here now; it is required before it + // can be accepted and notified. + s.blake256HaserMu.Lock() + msg.WriteHash(s.blake256Hasher) + s.blake256HaserMu.Unlock() + + err = s.cfg.SyncMgr.SubmitMixMessage(msg) + if err != nil { + // XXX: consider a better error code/function + str := fmt.Sprintf("Rejected mix message: %s", err) + return nil, rpcMiscError(str) + } + + s.cfg.ConnMgr.RelayMixMessages([]mixing.Message{msg}) + + s.ntfnMgr.NotifyMixMessage(msg) + + return nil, nil +} + // handleSendRawTransaction implements the sendrawtransaction command. func handleSendRawTransaction(_ context.Context, s *Server, cmd interface{}) (interface{}, error) { c := cmd.(*types.SendRawTransactionCmd) @@ -5017,6 +5106,12 @@ type Server struct { workState *workState helpCacher RPCHelpCacher requestProcessShutdown chan struct{} + + // blake256Hasher is the hash.Hash object that is used after + // deserializing mixing messages. Message handlers may be executed + // concurrently, and access requires the mutex. + blake256Hasher hash.Hash + blake256HaserMu sync.Mutex } // isTreasuryAgendaActive returns if the treasury agenda is active or not for @@ -5163,6 +5258,14 @@ func (s *Server) NotifyTSpend(tx *dcrutil.Tx) { s.ntfnMgr.NotifyTSpend(tx) } +// NotifyMixMessages notifies websocket clients that have registered to +// receive mixing message notifications of newly accepted mix messages. +func (s *Server) NotifyMixMessages(msgs []mixing.Message) { + for _, msg := range msgs { + s.ntfnMgr.NotifyMixMessage(msg) + } +} + // NotifyNewTickets notifies websocket clients that have registered for maturing // ticket updates. func (s *Server) NotifyNewTickets(tnd *blockchain.TicketNotificationsData) { @@ -5977,6 +6080,9 @@ type Config struct { // FiltererV2 defines the V2 filterer for the RPC server to use. FiltererV2 FiltererV2 + + // MixPooler defines the mixpool for the RPC server to use. + MixPooler MixPooler } // New returns a new instance of the Server struct. @@ -5987,6 +6093,7 @@ func New(config *Config) (*Server, error) { workState: newWorkState(), helpCacher: newHelpCacher(), requestProcessShutdown: make(chan struct{}), + blake256Hasher: blake256.New(), } key := make([]byte, 32) _, err := io.ReadFull(rand.Reader, key) diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index 2e6f36dcd9..4728b8e7e6 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -41,6 +41,7 @@ import ( "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" @@ -531,6 +532,7 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { type testSyncManager struct { isCurrent bool submitBlockErr error + submitMixErr error syncPeerID int32 syncHeight int64 processTransaction []*dcrutil.Tx @@ -550,6 +552,10 @@ func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { return s.submitBlockErr } +func (s *testSyncManager) SubmitMixMessage(msg mixing.Message) error { + return s.submitMixErr +} + // SyncPeer returns a mocked id of the current peer being synced with. func (s *testSyncManager) SyncPeerID() int32 { return s.syncPeerID @@ -869,6 +875,10 @@ func (c *testConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interfa // inventory vectors for all of the passed transactions to all connected peers. func (c *testConnManager) RelayTransactions(txns []*dcrutil.Tx) {} +// RelayMixMessages generates and relays inventory vectors for all of +// the passed mixing messages to all connected peers. +func (c *testConnManager) RelayMixMessages(msgs []mixing.Message) {} + // Lookup defines a mocked DNS lookup function to be used. func (c *testConnManager) Lookup(host string) ([]net.IP, error) { return c.lookup(host) @@ -1161,6 +1171,10 @@ func (mgr *testNtfnManager) NotifyNewTickets(tnd *blockchain.TicketNotifications // manager for processing. func (mgr *testNtfnManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) {} +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (mgr *testNtfnManager) NotifyMixMessage(msg mixing.Message) {} + // NumClients returns the number of clients actively being served. func (mgr *testNtfnManager) NumClients() int { return mgr.clients @@ -1222,6 +1236,14 @@ func (mgr *testNtfnManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {} // client when new transaction are added to the memory pool. func (mgr *testNtfnManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {} +// RegisterMixMessages requests notifications to the passed websocket +// client when new mixing messages are accepted by the mixpool. +func (mgr *testNtfnManager) RegisterMixMessages(wsc *wsClient) {} + +// UnregisterMixMessages stops notifications to the websocket client +// of any newly-accepted mixing messages. +func (mgr *testNtfnManager) UnregisterMixMessages(wsc *wsClient) {} + // AddClient adds the passed websocket client to the notification manager. func (mgr *testNtfnManager) AddClient(wsc *wsClient) {} diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go index 24c6dc7f12..b28257383f 100644 --- a/internal/rpcserver/rpcserverhelp.go +++ b/internal/rpcserver/rpcserverhelp.go @@ -510,6 +510,10 @@ var helpDescsEnUS = map[string]string{ // GetMiningInfoCmd help. "getmininginfo--synopsis": "Returns a JSON object containing mining-related information.", + // GetMixPairRequests help. + "getmixpairrequests--synopsis": "Returns current set of mixing pair request messages from mixpool.", + "getmixpairrequests--result0": "JSON array of hex-encoded mixing pair request messages.", + // GetNetworkHashPSCmd help. "getnetworkhashps--synopsis": "Returns the estimated network hashes per second for the block heights provided by the parameters.", "getnetworkhashps-blocks": "The number of blocks or -1 for the default number of blocks", @@ -789,6 +793,14 @@ var helpDescsEnUS = map[string]string{ // StopNotifyNewTransactionsCmd help. "stopnotifynewtransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", + "notifymixmessages--synopsis": "Request notifications for whenever mixing messages are accepted to the mixpool.", + + "stopnotifymixmessages--synopsis": "Cancel registered notifications for whenever mixing messages are accepted to the mixpool.", + + "sendrawmixmessage--synopsis": "Submit a mixing message to the mixpool and broadcast it to the network and all peers", + "sendrawmixmessage-message": "Mixing message serialized and encoded as hex", + "sendrawmixmessage-command": "The wire command name of the message type", + // OutPoint help. "outpoint-hash": "The hex-encoded bytes of the outpoint hash", "outpoint-index": "The index of the outpoint", @@ -922,8 +934,8 @@ var helpDescsEnUS = map[string]string{ // pointer to the type (or nil to indicate no return value). var rpcResultTypes = map[types.Method][]interface{}{ "addnode": nil, - "createrawsstx": {(*string)(nil)}, "createrawssrtx": {(*string)(nil)}, + "createrawsstx": {(*string)(nil)}, "createrawtransaction": {(*string)(nil)}, "debuglevel": {(*string)(nil), (*string)(nil)}, "decoderawtransaction": {(*types.TxRawDecodeResult)(nil)}, @@ -936,9 +948,9 @@ var rpcResultTypes = map[types.Method][]interface{}{ "existsliveticket": {(*bool)(nil)}, "existslivetickets": {(*string)(nil)}, "existsmempooltxs": {(*string)(nil)}, + "generate": {(*[]string)(nil)}, "getaddednodeinfo": {(*[]string)(nil), (*[]types.GetAddedNodeInfoResult)(nil)}, "getbestblock": {(*types.GetBestBlockResult)(nil)}, - "generate": {(*[]string)(nil)}, "getbestblockhash": {(*string)(nil)}, "getblock": {(*string)(nil), (*types.GetBlockVerboseResult)(nil)}, "getblockchaininfo": {(*types.GetBlockChainInfoResult)(nil)}, @@ -948,24 +960,26 @@ var rpcResultTypes = map[types.Method][]interface{}{ "getblocksubsidy": {(*types.GetBlockSubsidyResult)(nil)}, "getcfilterv2": {(*types.GetCFilterV2Result)(nil)}, "getchaintips": {(*[]types.GetChainTipsResult)(nil)}, + "getcoinsupply": {(*int64)(nil)}, "getconnectioncount": {(*int32)(nil)}, "getcurrentnet": {(*uint32)(nil)}, "getdifficulty": {(*float64)(nil)}, - "getstakedifficulty": {(*types.GetStakeDifficultyResult)(nil)}, - "getstakeversioninfo": {(*types.GetStakeVersionInfoResult)(nil)}, - "getstakeversions": {(*types.GetStakeVersionsResult)(nil)}, "getgenerate": {(*bool)(nil)}, "gethashespersec": {(*float64)(nil)}, "getheaders": {(*types.GetHeadersResult)(nil)}, "getinfo": {(*types.InfoChainResult)(nil)}, "getmempoolinfo": {(*types.GetMempoolInfoResult)(nil)}, "getmininginfo": {(*types.GetMiningInfoResult)(nil)}, + "getmixpairrequests": {(*[]string)(nil)}, "getnettotals": {(*types.GetNetTotalsResult)(nil)}, "getnetworkhashps": {(*int64)(nil)}, "getnetworkinfo": {(*[]types.GetNetworkInfoResult)(nil)}, "getpeerinfo": {(*[]types.GetPeerInfoResult)(nil)}, "getrawmempool": {(*[]string)(nil), (*types.GetRawMempoolVerboseResult)(nil)}, "getrawtransaction": {(*string)(nil), (*types.TxRawResult)(nil)}, + "getstakedifficulty": {(*types.GetStakeDifficultyResult)(nil)}, + "getstakeversioninfo": {(*types.GetStakeVersionInfoResult)(nil)}, + "getstakeversions": {(*types.GetStakeVersionsResult)(nil)}, "getticketpoolvalue": {(*float64)(nil)}, "gettreasurybalance": {(*types.GetTreasuryBalanceResult)(nil)}, "gettreasuryspendvotes": {(*types.GetTreasurySpendVotesResult)(nil)}, @@ -973,7 +987,6 @@ var rpcResultTypes = map[types.Method][]interface{}{ "gettxoutsetinfo": {(*types.GetTxOutSetInfoResult)(nil)}, "getvoteinfo": {(*types.GetVoteInfoResult)(nil)}, "getwork": {(*types.GetWorkResult)(nil), (*bool)(nil)}, - "getcoinsupply": {(*int64)(nil)}, "help": {(*string)(nil), (*string)(nil)}, "invalidateblock": nil, "livetickets": {(*types.LiveTicketsResult)(nil)}, @@ -981,6 +994,7 @@ var rpcResultTypes = map[types.Method][]interface{}{ "ping": nil, "reconsiderblock": nil, "regentemplate": nil, + "sendrawmixmessage": nil, "sendrawtransaction": {(*string)(nil)}, "setgenerate": nil, "stop": {(*string)(nil)}, @@ -996,19 +1010,21 @@ var rpcResultTypes = map[types.Method][]interface{}{ // Websocket commands. "loadtxfilter": nil, - "notifywinningtickets": nil, - "notifynewtickets": nil, "notifyblocks": nil, - "notifywork": nil, - "notifytspend": nil, + "notifymixmessages": nil, + "notifynewtickets": nil, "notifynewtransactions": nil, + "notifytspend": nil, + "notifywinningtickets": nil, + "notifywork": nil, "rebroadcastwinners": nil, "rescan": {(*types.RescanResult)(nil)}, "session": {(*types.SessionResult)(nil)}, "stopnotifyblocks": nil, - "stopnotifywork": nil, - "stopnotifytspend": nil, + "stopnotifymixmessages": nil, "stopnotifynewtransactions": nil, + "stopnotifytspend": nil, + "stopnotifywork": nil, } // helpCacher provides a concurrent safe type that provides help and usage for diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index d36926c8ec..8fab9d1efd 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -29,6 +29,7 @@ import ( "github.com/decred/dcrd/dcrutil/v4" "github.com/decred/dcrd/internal/blockchain" "github.com/decred/dcrd/internal/mining" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" "github.com/decred/dcrd/txscript/v4/stdscript" @@ -86,6 +87,7 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "notifywinningtickets": handleWinningTickets, "notifynewtickets": handleNewTickets, "notifynewtransactions": handleNotifyNewTransactions, + "notifymixmessages": handleNotifyMixMessages, "rebroadcastwinners": handleRebroadcastWinners, "rescan": handleRescan, "session": handleSession, @@ -93,6 +95,7 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "stopnotifywork": handleStopNotifyWork, "stopnotifytspend": handleStopNotifyTSpend, "stopnotifynewtransactions": handleStopNotifyNewTransactions, + "stopnotifymixmessages": handleStopNotifyMixMessages, } // WebsocketHandler handles a new websocket client by creating a new wsClient, @@ -278,6 +281,15 @@ func (m *wsNotificationManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) { } } +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (m *wsNotificationManager) NotifyMixMessage(msg mixing.Message) { + select { + case m.queueNotification <- (notificationMixMessage)(msg): + case <-m.quit: + } +} + // WinningTicketsNtfnData is the data that is used to generate // winning ticket notifications (which indicate a block and // the tickets eligible to vote on it). @@ -408,6 +420,7 @@ type notificationTxAcceptedByMempool struct { isNew bool tx *dcrutil.Tx } +type notificationMixMessage mixing.Message // Notification control requests type notificationRegisterClient wsClient @@ -424,6 +437,8 @@ type notificationRegisterNewTickets wsClient type notificationUnregisterNewTickets wsClient type notificationRegisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient +type notificationRegisterMixMessages wsClient +type notificationUnregisterMixMessages wsClient // notificationHandler reads notifications and control messages from the queue // handler and processes one at a time. @@ -444,6 +459,7 @@ func (m *wsNotificationManager) notificationHandler(ctx context.Context) { winningTicketNotifications := make(map[chan struct{}]*wsClient) ticketNewNotifications := make(map[chan struct{}]*wsClient) txNotifications := make(map[chan struct{}]*wsClient) + mixNotifications := make(map[chan struct{}]*wsClient) out: for { @@ -489,6 +505,17 @@ out: } m.notifyRelevantTxAccepted(n.tx, clients) + case notificationMixMessage: + m.notifyMixMessage(mixNotifications, (mixing.Message)(n)) + + case *notificationRegisterMixMessages: + wsc := (*wsClient)(n) + mixNotifications[wsc.quit] = wsc + + case *notificationUnregisterMixMessages: + wsc := (*wsClient)(n) + delete(mixNotifications, wsc.quit) + case *notificationRegisterBlocks: wsc := (*wsClient)(n) blockNotifications[wsc.quit] = wsc @@ -1179,6 +1206,58 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx, } } +// RegisterMixMessages requests notifications to the passed websocket +// client when mixing messages are accepted to the mixpool. +func (m *wsNotificationManager) RegisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationRegisterMixMessages)(wsc): + case <-m.quit: + } +} + +// UnregisterMixMessages stops notifications to the websocket client of any +// newly-accepted mixing messages. +func (m *wsNotificationManager) UnregisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationUnregisterMixMessages)(wsc): + case <-m.quit: + } +} + +// notifyMixMessage notifies all clients subscribed to mixing messages with +// the accepted mixing message. +func (m *wsNotificationManager) notifyMixMessage(clients map[chan struct{}]*wsClient, + msg mixing.Message) { + + // Skip notification creation if no clients have requested mix + // notifications. + if len(clients) == 0 { + return + } + + // Write write message payload in hex encoding. + buf := new(bytes.Buffer) + err := msg.BtcEncode(hex.NewEncoder(buf), wire.MixVersion) + if err != nil { + // Should never error; the message has already been processed + // and accepted. + log.Errorf("Failed to serialize accepted mix message for "+ + "notification: %v", err) + return + } + + ntfn := types.NewMixMessageNtfn(msg.Command(), buf.String()) + marshaledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn) + if err != nil { + log.Errorf("Failed to marshal mix message notification: %v", + err) + return + } + for _, client := range clients { + client.QueueNotification(marshaledJSON) + } +} + // AddClient adds the passed websocket client to the notification manager. func (m *wsNotificationManager) AddClient(wsc *wsClient) { select { @@ -2173,6 +2252,20 @@ func handleStopNotifyNewTransactions(_ context.Context, wsc *wsClient, _ interfa return nil, nil } +// handleNotifyMixMessages implements the notifymixmessages command extension +// for websocket connections. +func handleNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.RegisterMixMessages(wsc) + return nil, nil +} + +// handleStopNotifyMixMessages implements the stopnotifymixmessages command +// extension for websocket connections. +func handleStopNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.UnregisterMixMessages(wsc) + return nil, nil +} + // rescanBlock rescans a block for any relevant transactions for the passed // lookup keys. Any discovered transactions are returned hex encoded as a // string slice. diff --git a/log.go b/log.go index eb4be6dacc..cddaa2e3b7 100644 --- a/log.go +++ b/log.go @@ -22,6 +22,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/slog" @@ -68,6 +69,7 @@ var ( feesLog = backendLog.Logger("FEES") indxLog = backendLog.Logger("INDX") minrLog = backendLog.Logger("MINR") + mixpLog = backendLog.Logger("MIXP") peerLog = backendLog.Logger("PEER") rpcsLog = backendLog.Logger("RPCS") scrpLog = backendLog.Logger("SCRP") @@ -89,6 +91,7 @@ func init() { indexers.UseLogger(indxLog) mempool.UseLogger(txmpLog) mining.UseLogger(minrLog) + mixpool.UseLogger(mixpLog) cpuminer.UseLogger(minrLog) peer.UseLogger(peerLog) rpcserver.UseLogger(rpcsLog) @@ -109,6 +112,7 @@ var subsystemLoggers = map[string]slog.Logger{ "FEES": feesLog, "INDX": indxLog, "MINR": minrLog, + "MIXP": mixpLog, "PEER": peerLog, "RPCS": rpcsLog, "SCRP": scrpLog, diff --git a/peer/log.go b/peer/log.go index 90f80e615f..e7f124546c 100644 --- a/peer/log.go +++ b/peer/log.go @@ -162,7 +162,8 @@ func messageSummary(msg wire.Message) string { case *wire.MsgInitState: return fmt.Sprintf("blocks %d, votes %d, treasury spends %d", - len(msg.BlockHashes), len(msg.VoteHashes), len(msg.TSpendHashes)) + len(msg.BlockHashes), len(msg.VoteHashes), + len(msg.TSpendHashes)) } // No summary for other messages. diff --git a/peer/peer.go b/peer/peer.go index 8070f442d8..210d36fdc6 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -205,6 +205,30 @@ type MessageListeners struct { // OnInitState is invoked when a peer receives an initstate message. OnInitState func(p *Peer, msg *wire.MsgInitState) + // OnMixPairReq is invoked when a peer receives a mixpairreq message. + OnMixPairReq func(p *Peer, msg *wire.MsgMixPairReq) + + // OnMixKeyExchange is invoked when a peer receives a mixkeyxchg message. + OnMixKeyExchange func(p *Peer, msg *wire.MsgMixKeyExchange) + + // OnMixCiphertexts is invoked when a peer receives a mixcphrtxt message. + OnMixCiphertexts func(p *Peer, msg *wire.MsgMixCiphertexts) + + // OnMixSlotReserve is invoked when a peer receives a mixslotres message. + OnMixSlotReserve func(p *Peer, msg *wire.MsgMixSlotReserve) + + // OnMixDCNet is invoked when a peer receives a mixdcnet message. + OnMixDCNet func(p *Peer, msg *wire.MsgMixDCNet) + + // OnMixConfirm is invoked when a peer receives a mixconfirm message. + OnMixConfirm func(p *Peer, msg *wire.MsgMixConfirm) + + // OnMixFactoredPoly is invoked when a peer receives a mixfactpoly message. + OnMixFactoredPoly func(p *Peer, msg *wire.MsgMixFactoredPoly) + + // OnMixSecrets is invoked when a peer receives a mixsecrets message. + OnMixSecrets func(p *Peer, msg *wire.MsgMixSecrets) + // OnRead is invoked when a peer receives a wire message. It consists // of the number of bytes read, the message, and whether or not an error // in the read occurred. Typically, callers will opt to use the @@ -1068,9 +1092,16 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st addedDeadline = true case wire.CmdGetData: - // Expects a block, tx, or notfound message. + // Expects a block, tx, mix, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdMixPairReq] = deadline + pendingResponses[wire.CmdMixKeyExchange] = deadline + pendingResponses[wire.CmdMixCiphertexts] = deadline + pendingResponses[wire.CmdMixSlotReserve] = deadline + pendingResponses[wire.CmdMixDCNet] = deadline + pendingResponses[wire.CmdMixConfirm] = deadline + pendingResponses[wire.CmdMixSecrets] = deadline pendingResponses[wire.CmdNotFound] = deadline addedDeadline = true @@ -1134,9 +1165,30 @@ out: fallthrough case wire.CmdTx: fallthrough + case wire.CmdMixPairReq: + fallthrough + case wire.CmdMixKeyExchange: + fallthrough + case wire.CmdMixCiphertexts: + fallthrough + case wire.CmdMixSlotReserve: + fallthrough + case wire.CmdMixDCNet: + fallthrough + case wire.CmdMixConfirm: + fallthrough + case wire.CmdMixSecrets: + fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdMixPairReq) + delete(pendingResponses, wire.CmdMixKeyExchange) + delete(pendingResponses, wire.CmdMixCiphertexts) + delete(pendingResponses, wire.CmdMixSlotReserve) + delete(pendingResponses, wire.CmdMixDCNet) + delete(pendingResponses, wire.CmdMixConfirm) + delete(pendingResponses, wire.CmdMixSecrets) delete(pendingResponses, wire.CmdNotFound) default: @@ -1450,6 +1502,46 @@ out: p.cfg.Listeners.OnInitState(p, msg) } + case *wire.MsgMixPairReq: + if p.cfg.Listeners.OnMixPairReq != nil { + p.cfg.Listeners.OnMixPairReq(p, msg) + } + + case *wire.MsgMixKeyExchange: + if p.cfg.Listeners.OnMixKeyExchange != nil { + p.cfg.Listeners.OnMixKeyExchange(p, msg) + } + + case *wire.MsgMixCiphertexts: + if p.cfg.Listeners.OnMixCiphertexts != nil { + p.cfg.Listeners.OnMixCiphertexts(p, msg) + } + + case *wire.MsgMixSlotReserve: + if p.cfg.Listeners.OnMixSlotReserve != nil { + p.cfg.Listeners.OnMixSlotReserve(p, msg) + } + + case *wire.MsgMixDCNet: + if p.cfg.Listeners.OnMixDCNet != nil { + p.cfg.Listeners.OnMixDCNet(p, msg) + } + + case *wire.MsgMixConfirm: + if p.cfg.Listeners.OnMixConfirm != nil { + p.cfg.Listeners.OnMixConfirm(p, msg) + } + + case *wire.MsgMixFactoredPoly: + if p.cfg.Listeners.OnMixFactoredPoly != nil { + p.cfg.Listeners.OnMixFactoredPoly(p, msg) + } + + case *wire.MsgMixSecrets: + if p.cfg.Listeners.OnMixSecrets != nil { + p.cfg.Listeners.OnMixSecrets(p, msg) + } + default: log.Debugf("Received unhandled message of type %v "+ "from %v", rmsg.Command(), p) diff --git a/rpc/jsonrpc/types/chainsvrcmds.go b/rpc/jsonrpc/types/chainsvrcmds.go index 4ec2ff443e..67de037986 100644 --- a/rpc/jsonrpc/types/chainsvrcmds.go +++ b/rpc/jsonrpc/types/chainsvrcmds.go @@ -552,6 +552,15 @@ func NewGetMiningInfoCmd() *GetMiningInfoCmd { return &GetMiningInfoCmd{} } +// GetMixPairRequestsCmd defines the getmixpairrequests JSON-RPC command. +type GetMixPairRequestsCmd struct{} + +// NewGetMixPairRequestsCmd returns a new instance which can be used to issue a +// getmixpairrequests JSON-RPC command. +func NewGetMixPairRequestsCmd() *GetMixPairRequestsCmd { + return &GetMixPairRequestsCmd{} +} + // GetNetworkInfoCmd defines the getnetworkinfo JSON-RPC command. type GetNetworkInfoCmd struct{} @@ -898,6 +907,21 @@ func NewReconsiderBlockCmd(hash string) *ReconsiderBlockCmd { } } +// SendRawMixMessage defines the sendrawmixmessage JSON-RPC command. +type SendRawMixMessageCmd struct { + Command string + Message string +} + +// NewSendRawMixMessageCmd returns a new instance which can be used to issue a +// sendrawmixmessage JSON-RPC command. +func NewSendRawMixMessageCmd(command, message string) *SendRawMixMessageCmd { + return &SendRawMixMessageCmd{ + Command: command, + Message: message, + } +} + // SendRawTransactionCmd defines the sendrawtransaction JSON-RPC command. type SendRawTransactionCmd struct { HexTx string @@ -1122,6 +1146,7 @@ func init() { dcrjson.MustRegister(Method("getinfo"), (*GetInfoCmd)(nil), flags) dcrjson.MustRegister(Method("getmempoolinfo"), (*GetMempoolInfoCmd)(nil), flags) dcrjson.MustRegister(Method("getmininginfo"), (*GetMiningInfoCmd)(nil), flags) + dcrjson.MustRegister(Method("getmixpairrequests"), (*GetMixPairRequestsCmd)(nil), flags) dcrjson.MustRegister(Method("getnetworkinfo"), (*GetNetworkInfoCmd)(nil), flags) dcrjson.MustRegister(Method("getnettotals"), (*GetNetTotalsCmd)(nil), flags) dcrjson.MustRegister(Method("getnetworkhashps"), (*GetNetworkHashPSCmd)(nil), flags) @@ -1145,6 +1170,7 @@ func init() { dcrjson.MustRegister(Method("ping"), (*PingCmd)(nil), flags) dcrjson.MustRegister(Method("reconsiderblock"), (*ReconsiderBlockCmd)(nil), flags) dcrjson.MustRegister(Method("regentemplate"), (*RegenTemplateCmd)(nil), flags) + dcrjson.MustRegister(Method("sendrawmixmessage"), (*SendRawMixMessageCmd)(nil), flags) dcrjson.MustRegister(Method("sendrawtransaction"), (*SendRawTransactionCmd)(nil), flags) dcrjson.MustRegister(Method("setgenerate"), (*SetGenerateCmd)(nil), flags) dcrjson.MustRegister(Method("stop"), (*StopCmd)(nil), flags) diff --git a/rpc/jsonrpc/types/chainsvrwscmds.go b/rpc/jsonrpc/types/chainsvrwscmds.go index c1998337f7..1f256b24ad 100644 --- a/rpc/jsonrpc/types/chainsvrwscmds.go +++ b/rpc/jsonrpc/types/chainsvrwscmds.go @@ -153,6 +153,24 @@ func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd { } } +// NotifyMixMessagesCmd defines the notifymixmessages JSON-RPC command. +type NotifyMixMessagesCmd struct{} + +// NewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// notifymixmessages JSON-RPC command. +func NewNotifyMixMessagesCmd() *NotifyMixMessagesCmd { + return &NotifyMixMessagesCmd{} +} + +// StopNotifyMixMessagesCmd defines the stopnotifymixmessages JSON-RPC command. +type StopNotifyMixMessagesCmd struct{} + +// StopNewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// stopnotifymixmessages JSON-RPC command. +func StopNewNotifyMixMessagesCmd() *StopNotifyMixMessagesCmd { + return &StopNotifyMixMessagesCmd{} +} + // SessionCmd defines the session JSON-RPC command. type SessionCmd struct{} @@ -197,11 +215,13 @@ func init() { dcrjson.MustRegister(Method("notifynewtransactions"), (*NotifyNewTransactionsCmd)(nil), flags) dcrjson.MustRegister(Method("notifynewtickets"), (*NotifyNewTicketsCmd)(nil), flags) dcrjson.MustRegister(Method("notifywinningtickets"), (*NotifyWinningTicketsCmd)(nil), flags) + dcrjson.MustRegister(Method("notifymixmessages"), (*NotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rebroadcastwinners"), (*RebroadcastWinnersCmd)(nil), flags) dcrjson.MustRegister(Method("session"), (*SessionCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifyblocks"), (*StopNotifyBlocksCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifywork"), (*StopNotifyWorkCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifytspend"), (*StopNotifyTSpendCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifynewtransactions"), (*StopNotifyNewTransactionsCmd)(nil), flags) + dcrjson.MustRegister(Method("stopnotifymixmessages"), (*StopNotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rescan"), (*RescanCmd)(nil), flags) } diff --git a/rpc/jsonrpc/types/chainsvrwsntfns.go b/rpc/jsonrpc/types/chainsvrwsntfns.go index 5942a86b54..2f563229f5 100644 --- a/rpc/jsonrpc/types/chainsvrwsntfns.go +++ b/rpc/jsonrpc/types/chainsvrwsntfns.go @@ -52,6 +52,9 @@ const ( // WinningTicketsNtfnMethod is the method of the daemon winningtickets // notification. WinningTicketsNtfnMethod Method = "winningtickets" + + // MixMessageNtfnMethod is the method of the mixmessage notification. + MixMessageNtfnMethod Method = "mixmessage" ) // BlockConnectedNtfn defines the blockconnected JSON-RPC notification. @@ -207,6 +210,20 @@ func NewWinningTicketsNtfn(hash string, height int32, tickets map[string]string) } } +// MixMessageNtfn defines the mixmessage JSON-RPC notification. +type MixMessageNtfn struct { + Command string `json:"command"` + Payload string `json:"payload"` +} + +// NewMixMessageNtfn returns a new instance which can be used to issue a +// mixmessage JSON-RPC notification. +func NewMixMessageNtfn(command, payload string) *MixMessageNtfn { + return &MixMessageNtfn{ + Command: command, + Payload: payload, + } +} func init() { // The commands in this file are only usable by websockets and are // notifications. @@ -222,4 +239,5 @@ func init() { dcrjson.MustRegister(TxAcceptedVerboseNtfnMethod, (*TxAcceptedVerboseNtfn)(nil), flags) dcrjson.MustRegister(RelevantTxAcceptedNtfnMethod, (*RelevantTxAcceptedNtfn)(nil), flags) dcrjson.MustRegister(WinningTicketsNtfnMethod, (*WinningTicketsNtfn)(nil), flags) + dcrjson.MustRegister(MixMessageNtfnMethod, (*MixMessageNtfn)(nil), flags) } diff --git a/rpcadaptors.go b/rpcadaptors.go index 113c56a97b..d629bfec23 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -282,6 +283,15 @@ func (cm *rpcConnManager) RelayTransactions(txns []*dcrutil.Tx) { cm.server.relayTransactions(txns) } +// RelayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserver.ConnManager interface implementation. +func (cm *rpcConnManager) RelayMixMessages(msgs []mixing.Message) { + cm.server.relayMixMessages(msgs) +} + // Lookup defines the DNS lookup function to be used. // // This function is safe for concurrent access and is part of the @@ -348,6 +358,12 @@ func (b *rpcSyncMgr) RecentlyConfirmedTxn(hash *chainhash.Hash) bool { return b.server.recentlyConfirmedTxns.Contains(hash[:]) } +// SubmitMixMessage locally processes the mixing message. +func (b *rpcSyncMgr) SubmitMixMessage(msg mixing.Message) error { + _, err := b.server.mixMsgPool.AcceptMessage(msg) + return err +} + // rpcUtxoEntry represents a utxo entry for use with the RPC server and // implements the rpcserver.UtxoEntry interface. type rpcUtxoEntry struct { diff --git a/server.go b/server.go index da7921970f..e789ec707c 100644 --- a/server.go +++ b/server.go @@ -14,6 +14,7 @@ import ( "encoding/binary" "errors" "fmt" + "hash" "math" "net" "os" @@ -33,6 +34,7 @@ import ( "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/connmgr/v3" "github.com/decred/dcrd/container/apbf" + "github.com/decred/dcrd/crypto/blake256" "github.com/decred/dcrd/database/v3" "github.com/decred/dcrd/dcrutil/v4" "github.com/decred/dcrd/internal/blockchain" @@ -45,6 +47,8 @@ import ( "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/wire" @@ -512,6 +516,7 @@ type server struct { txMemPool *mempool.TxPool feeEstimator *fees.Estimator cpuMiner *cpuminer.CPUMiner + mixMsgPool *mixpool.Pool modifyRebroadcastInv chan interface{} newPeers chan *serverPeer donePeers chan *serverPeer @@ -570,8 +575,9 @@ type serverPeer struct { // The following fields are used to synchronize the net sync manager and // server. - txProcessed chan struct{} - blockProcessed chan struct{} + txProcessed chan struct{} + blockProcessed chan struct{} + mixMsgProcessed chan error // peerNa is network address of the peer connected to. peerNa atomic.Pointer[wire.NetAddress] @@ -590,19 +596,27 @@ type serverPeer struct { // data item requests that still need to be served. getDataQueue chan []*wire.InvVect numPendingGetDataItemReqs atomic.Uint32 + + // blake256Hasher is the hash.Hash object that is reused by the + // message listener callbacks (the serverPeer's On* methods) to hash + // mixing messages. It does not require locking, as the message + // listeners are executed serially. + blake256Hasher hash.Hash } // newServerPeer returns a new serverPeer instance. The peer needs to be set by // the caller. func newServerPeer(s *server, isPersistent bool) *serverPeer { return &serverPeer{ - server: s, - persistent: isPersistent, - knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), - quit: make(chan struct{}), - txProcessed: make(chan struct{}, 1), - blockProcessed: make(chan struct{}, 1), - getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), + server: s, + persistent: isPersistent, + knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), + quit: make(chan struct{}), + txProcessed: make(chan struct{}, 1), + blockProcessed: make(chan struct{}, 1), + mixMsgProcessed: make(chan error, 1), + getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), + blake256Hasher: blake256.New(), } } @@ -666,6 +680,16 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, continueHash := sp.continueHash.Load() sendInv = continueHash != nil && *continueHash == *blockHash + case wire.InvTypeMix: + mixHash := &iv.Hash + msg, err := sp.server.mixMsgPool.Message(mixHash) + if err != nil { + peerLog.Tracef("Unable to fetch requested mix message %v: %v", + mixHash, err) + break + } + dataMsg = msg + default: peerLog.Warnf("Unknown type '%d' in inventory request from %s", iv.Type, sp) @@ -1161,7 +1185,7 @@ func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) { } err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes, - voteHashes, nil) + voteHashes, nil, nil) if err != nil { peerLog.Warnf("couldn't handle mining state message: %v", err.Error()) @@ -1248,7 +1272,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { // requests the data advertised in the message from the peer. func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) { err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, - msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes) + msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes, nil) if err != nil { peerLog.Warnf("couldn't handle init state message: %v", err) } @@ -1328,6 +1352,12 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { sp.Disconnect() return } + if invVect.Type == wire.InvTypeMix { + peerLog.Infof("Peer %v is announcing mix messages -- disconnecting", + sp) + sp.Disconnect() + return + } err := newInv.AddInvVect(invVect) if err != nil { peerLog.Errorf("Failed to add inventory vector: %v", err) @@ -1625,6 +1655,86 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { sp.server.addrManager.AddAddresses(addrList, remoteAddr) } +// onMixMessage is the generic handler for all mix messages handler callbacks. +func (sp *serverPeer) onMixMessage(msg mixing.Message) { + if cfg.BlocksOnly { + peerLog.Tracef("Ignoring mix message %v from %v - blocksonly "+ + "enabled", msg.Hash(), sp) + return + } + + // Calculate the message hash, so it can be added to known inventory + // and used by the sync manager. + msg.WriteHash(sp.blake256Hasher) + hash := msg.Hash() + + // Add the message to the known inventory for the peer. + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + sp.AddKnownInventory(iv) + + // Queue the message to be handled by the net sync manager + // XXX: add ban score increases for non-instaban errors? + sp.server.syncManager.QueueMixMsg(msg, sp.syncMgrPeer, sp.mixMsgProcessed) + err := <-sp.mixMsgProcessed + var missingOwnPRErr *mixpool.MissingOwnPRError + if errors.As(err, &missingOwnPRErr) { + mixHashes := []chainhash.Hash{missingOwnPRErr.MissingPR} + sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, + nil, nil, nil, mixHashes) + return + } + var ruleError *mixpool.RuleError + if errors.As(err, &ruleError) { + sp.server.BanPeer(sp) + sp.Disconnect() + } +} + +// OnMixPairReq submits a received mixing pair request message to the mixpool. +func (sp *serverPeer) OnMixPairReq(_ *peer.Peer, msg *wire.MsgMixPairReq) { + sp.onMixMessage(msg) +} + +// OnMixKeyExchange submits a received mixing key exchange message to the +// mixpool. +func (sp *serverPeer) OnMixKeyExchange(_ *peer.Peer, msg *wire.MsgMixKeyExchange) { + sp.onMixMessage(msg) +} + +// OnMixCiphertexts submits a received mixing ciphertext exchange message to +// the mixpool. +func (sp *serverPeer) OnMixCiphertexts(_ *peer.Peer, msg *wire.MsgMixCiphertexts) { + sp.onMixMessage(msg) +} + +// OnMixSlotReserve submits a received mixing slot reservation message to the +// mixpool. +func (sp *serverPeer) OnMixSlotReserve(_ *peer.Peer, msg *wire.MsgMixSlotReserve) { + sp.onMixMessage(msg) +} + +// OnMixDCNet submits a received mixing XOR DC-net message to the mixpool. +func (sp *serverPeer) OnMixDCNet(_ *peer.Peer, msg *wire.MsgMixDCNet) { + sp.onMixMessage(msg) +} + +// OnMixConfirm submits a received mixing confirmation message to the mixpool. +func (sp *serverPeer) OnMixConfirm(_ *peer.Peer, msg *wire.MsgMixConfirm) { + sp.onMixMessage(msg) +} + +// OnMixFactoredPoly submits a received factored polynomial message to the +// mixpool. +func (sp *serverPeer) OnMixFactoredPoly(_ *peer.Peer, msg *wire.MsgMixFactoredPoly) { + sp.onMixMessage(msg) +} + +// OnMixSecrets submits a received mixing reveal secrets message to the +// mixpool. +func (sp *serverPeer) OnMixSecrets(_ *peer.Peer, msg *wire.MsgMixSecrets) { + sp.onMixMessage(msg) +} + // OnRead is invoked when a peer receives a message and it is used to update // the bytes received by the server. func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { @@ -1650,13 +1760,15 @@ func (sp *serverPeer) OnNotFound(_ *peer.Peer, msg *wire.MsgNotFound) { return } - var numBlocks, numTxns uint32 + var numBlocks, numTxns, numMixMsgs uint32 for _, inv := range msg.InvList { switch inv.Type { case wire.InvTypeBlock: numBlocks++ case wire.InvTypeTx: numTxns++ + case wire.InvTypeMix: + numMixMsgs++ default: peerLog.Debugf("Invalid inv type '%d' in notfound message from %s", inv.Type, sp) @@ -1678,6 +1790,13 @@ func (sp *serverPeer) OnNotFound(_ *peer.Peer, msg *wire.MsgNotFound) { return } } + if numMixMsgs > 0 { + mixStr := pickNoun(uint64(numMixMsgs), "mix message", "mix messages") + reason := fmt.Sprintf("%d %v not found", numMixMsgs, mixStr) + if sp.addBanScore(0, 10*numMixMsgs, reason) { + return + } + } sp.server.syncManager.QueueNotFound(msg, sp.syncMgrPeer) } @@ -1764,6 +1883,16 @@ func (s *server) relayTransactions(txns []*dcrutil.Tx) { } } +// relayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +func (s *server) relayMixMessages(msgs []mixing.Message) { + for _, m := range msgs { + hash := m.Hash() + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + s.RelayInventory(iv, m, false) + } +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // websocket clients of the passed transactions. This function should be // called whenever new transactions are added to the mempool. @@ -1778,6 +1907,19 @@ func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) { } } +// AnnounceMixMessages generates and relays inventory vectors of the passed +// mixing messages. This function should be called whenever new messages are +// accepted to the mixpool. +func (s *server) AnnounceMixMessages(msgs []mixing.Message) { + // Generate and relay inventory vectors for all newly accepted mixing + // messages. + s.relayMixMessages(msgs) + + if s.rpcServer != nil { + s.rpcServer.NotifyMixMessages(msgs) + } +} + // TransactionConfirmed marks the provided single confirmation transaction as // no longer needing rebroadcasting and keeps track of it for use when avoiding // requests for recently confirmed transactions. @@ -2054,6 +2196,14 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { } } + if iv.Type == wire.InvTypeMix { + // Don't relay mix message inventory when unsupported + // by the negotiated protocol version. + if sp.ProtocolVersion() < wire.MixVersion { + return + } + } + // Either queue the inventory to be relayed immediately or with // the next batch depending on the immediate flag. // @@ -2302,30 +2452,38 @@ func newPeerConfig(sp *serverPeer) *peer.Config { return &peer.Config{ Listeners: peer.MessageListeners{ - OnVersion: sp.OnVersion, - OnVerAck: sp.OnVerAck, - OnMemPool: sp.OnMemPool, - OnGetMiningState: sp.OnGetMiningState, - OnMiningState: sp.OnMiningState, - OnGetInitState: sp.OnGetInitState, - OnInitState: sp.OnInitState, - OnTx: sp.OnTx, - OnBlock: sp.OnBlock, - OnInv: sp.OnInv, - OnHeaders: sp.OnHeaders, - OnGetData: sp.OnGetData, - OnGetBlocks: sp.OnGetBlocks, - OnGetHeaders: sp.OnGetHeaders, - OnGetCFilter: sp.OnGetCFilter, - OnGetCFilterV2: sp.OnGetCFilterV2, - OnGetCFiltersV2: sp.OnGetCFiltersV2, - OnGetCFHeaders: sp.OnGetCFHeaders, - OnGetCFTypes: sp.OnGetCFTypes, - OnGetAddr: sp.OnGetAddr, - OnAddr: sp.OnAddr, - OnRead: sp.OnRead, - OnWrite: sp.OnWrite, - OnNotFound: sp.OnNotFound, + OnVersion: sp.OnVersion, + OnVerAck: sp.OnVerAck, + OnMemPool: sp.OnMemPool, + OnGetMiningState: sp.OnGetMiningState, + OnMiningState: sp.OnMiningState, + OnGetInitState: sp.OnGetInitState, + OnInitState: sp.OnInitState, + OnTx: sp.OnTx, + OnBlock: sp.OnBlock, + OnMixPairReq: sp.OnMixPairReq, + OnMixKeyExchange: sp.OnMixKeyExchange, + OnMixCiphertexts: sp.OnMixCiphertexts, + OnMixSlotReserve: sp.OnMixSlotReserve, + OnMixDCNet: sp.OnMixDCNet, + OnMixConfirm: sp.OnMixConfirm, + OnMixFactoredPoly: sp.OnMixFactoredPoly, + OnMixSecrets: sp.OnMixSecrets, + OnInv: sp.OnInv, + OnHeaders: sp.OnHeaders, + OnGetData: sp.OnGetData, + OnGetBlocks: sp.OnGetBlocks, + OnGetHeaders: sp.OnGetHeaders, + OnGetCFilter: sp.OnGetCFilter, + OnGetCFilterV2: sp.OnGetCFilterV2, + OnGetCFiltersV2: sp.OnGetCFiltersV2, + OnGetCFHeaders: sp.OnGetCFHeaders, + OnGetCFTypes: sp.OnGetCFTypes, + OnGetAddr: sp.OnGetAddr, + OnAddr: sp.OnAddr, + OnRead: sp.OnRead, + OnWrite: sp.OnWrite, + OnNotFound: sp.OnNotFound, }, NewestBlock: sp.newestBlock, HostToNetAddress: func(host string, port uint16, services wire.ServiceFlag) (*wire.NetAddress, error) { @@ -3520,6 +3678,40 @@ func (c *reloadableTLSConfig) configFileClient(_ *tls.ClientHelloInfo) (*tls.Con return c.cachedConfig, nil } +// mixpoolChain adapts the internal blockchain type with a FetchUtxoEntry +// method that is compatible with the mixpool package. +type mixpoolChain struct { + blockchain *blockchain.BlockChain + mempool *mempool.TxPool +} + +var _ mixpool.BlockChain = (*mixpoolChain)(nil) +var _ mixpool.UtxoEntry = (*blockchain.UtxoEntry)(nil) + +func (m *mixpoolChain) ChainParams() *chaincfg.Params { + return m.blockchain.ChainParams() +} + +func (m *mixpoolChain) FetchUtxoEntry(op wire.OutPoint) (mixpool.UtxoEntry, error) { + if m.mempool.IsSpent(op) { + return nil, nil + } + + entry, err := m.blockchain.FetchUtxoEntry(op) + if err != nil { + return nil, err + } + if entry == nil { + return nil, err + } + return entry, nil +} + +func (m *mixpoolChain) CurrentTip() (chainhash.Hash, int64) { + snap := m.blockchain.BestSnapshot() + return snap.Hash, snap.Height +} + // makeReloadableTLSConfig returns a TLS configuration that will dynamically // reload the server certificate, server key, and client CAs from the configured // paths when the files are updated. @@ -3866,6 +4058,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, } s.txMemPool = mempool.New(&txC) + mixchain := &mixpoolChain{s.chain, s.txMemPool} + s.mixMsgPool = mixpool.NewPool(mixchain) + s.syncManager = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, @@ -3876,6 +4071,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, MaxPeers: cfg.MaxPeers, MaxOrphanTxs: cfg.MaxOrphanTxs, RecentlyConfirmedTxns: s.recentlyConfirmedTxns, + MixPool: s.mixMsgPool, }) // Dump the blockchain and quit if requested. @@ -4104,6 +4300,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, UserAgentVersion: userAgentVersion, LogManager: &rpcLogManager{}, FiltererV2: s.chain, + MixPooler: s.mixMsgPool, } if s.existsAddrIndex != nil { rpcsConfig.ExistsAddresser = s.existsAddrIndex