diff --git a/go.mod b/go.mod index 51c56fd3cb..2de095cc84 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/decred/dcrd/gcs/v4 v4.0.0 github.com/decred/dcrd/lru v1.1.2 github.com/decred/dcrd/math/uint256 v1.0.1 + github.com/decred/dcrd/mixing v0.0.0 github.com/decred/dcrd/peer/v3 v3.0.2 github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.0.0 github.com/decred/dcrd/rpcclient/v8 v8.0.0 @@ -38,18 +39,20 @@ require ( github.com/jrick/bitset v1.0.0 github.com/jrick/logrotate v1.0.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - golang.org/x/sys v0.8.0 - golang.org/x/term v0.5.0 + golang.org/x/sys v0.13.0 + golang.org/x/term v0.13.0 lukechampine.com/blake3 v1.2.1 ) require ( github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect + github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect github.com/dchest/siphash v1.2.3 // indirect github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/decred/dcrd/hdkeychain/v3 v3.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect + golang.org/x/crypto v0.7.0 // indirect ) replace ( @@ -75,6 +78,7 @@ replace ( github.com/decred/dcrd/limits => ./limits github.com/decred/dcrd/lru => ./lru github.com/decred/dcrd/math/uint256 => ./math/uint256 + github.com/decred/dcrd/mixing => ./mixing github.com/decred/dcrd/peer/v3 => ./peer github.com/decred/dcrd/rpc/jsonrpc/types/v4 => ./rpc/jsonrpc/types github.com/decred/dcrd/rpcclient/v8 => ./rpcclient diff --git a/go.sum b/go.sum index 88cb1baeb2..b8ec5c917b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ +decred.org/cspp/v2 v2.0.1-0.20230307024253-8a22691aa376 h1:739v8a7LMXuCTFNodcKYVpfj70CKWvJeE3NKFDn/65I= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0= +github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= @@ -53,11 +56,13 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -69,14 +74,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= diff --git a/internal/blockchain/chain.go b/internal/blockchain/chain.go index 1c897e78ef..8a36c8763e 100644 --- a/internal/blockchain/chain.go +++ b/internal/blockchain/chain.go @@ -297,6 +297,11 @@ type BlockChain struct { bulkImportMode bool } +// ChainParams returns the chain parameters. +func (b *BlockChain) ChainParams() *chaincfg.Params { + return b.chainParams +} + const ( // stakeMajorityCacheKeySize is comprised of the stake version and the // hash size. The stake version is a little endian uint32, hence we diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go index 7a930550a8..cad185c62c 100644 --- a/internal/netsync/interface.go +++ b/internal/netsync/interface.go @@ -6,6 +6,7 @@ package netsync import ( "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" ) // PeerNotifier provides an interface to notify peers of status changes related @@ -14,4 +15,8 @@ type PeerNotifier interface { // AnnounceNewTransactions generates and relays inventory vectors and // notifies websocket clients of the passed transactions. AnnounceNewTransactions(txns []*dcrutil.Tx) + + // AnnounceMixMessage generates and relays inventory vectors of the + // passed messages. + AnnounceMixMessages(msgs []mixing.Message) } diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 975ee32edf..a62ba4c74d 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -23,6 +23,8 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/progresslog" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" peerpkg "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -64,6 +66,10 @@ const ( maxRejectedTxns = 62500 rejectedTxnsFPRate = 0.0000001 + // XXX these numbers were copied from rejected txns + maxRejectedMixMsgs = 625000 + rejectedMixMsgsFPRate = 0.0000001 + // maxRequestedBlocks is the maximum number of requested block // hashes to store in memory. maxRequestedBlocks = wire.MaxInvPerMsg @@ -72,6 +78,10 @@ const ( // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + // maxRequestedMixMsgs is the maximum number of hashes of in-flight + // mixing messages. + maxRequestedMixMsgs = wire.MaxInvPerMsg + // maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in // a single message that is expected when determining when the message // appears to be announcing new blocks. @@ -179,15 +189,24 @@ type processBlockMsg struct { reply chan processBlockResponse } +// mixMsg is a message type to be sent across the message channel for requesting +// a message's acceptence to the mixing pool. +type mixMsg struct { + msg mixing.Message + peer *Peer + reply chan struct{} +} + // Peer extends a common peer to maintain additional state needed by the sync // manager. The internals are intentionally unexported to create an opaque // type. type Peer struct { *peerpkg.Peer - syncCandidate bool - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + syncCandidate bool + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} // initialStateRequested tracks whether or not the initial state data has // been requested from the peer. @@ -207,10 +226,11 @@ type Peer struct { func NewPeer(peer *peerpkg.Peer) *Peer { isSyncCandidate := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork return &Peer{ - Peer: peer, - syncCandidate: isSyncCandidate, - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + Peer: peer, + syncCandidate: isSyncCandidate, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), } } @@ -291,13 +311,15 @@ type SyncManager struct { // time. minKnownWork *uint256.Uint256 - rejectedTxns *apbf.Filter - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} - progressLogger *progresslog.Logger - syncPeer *Peer - msgChan chan interface{} - peers map[*Peer]struct{} + rejectedTxns *apbf.Filter + rejectedMixMsgs *apbf.Filter + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + requestedMixMsgs map[chainhash.Hash]struct{} + progressLogger *progresslog.Logger + syncPeer *Peer + msgChan chan interface{} + peers map[*Peer]struct{} // hdrSyncState houses the state used to track the initial header sync // process and related stall handling. @@ -567,6 +589,9 @@ func maybeRequestInitialState(peer *Peer) { err := m.AddTypes(wire.InitStateHeadBlocks, wire.InitStateHeadBlockVotes, wire.InitStateTSpends) + if err == nil && peer.ProtocolVersion() >= wire.MixVersion { + err = m.AddType(wire.InitStateMixPRs) + } if err != nil { log.Errorf("Unexpected error building getinitstate msg: %v", err) return @@ -666,6 +691,22 @@ BlockHashes: // No peers found that have announced this data. delete(m.requestedBlocks, blockHash) } + inv.Type = wire.InvTypeMix +MixHashes: + for mixHash := range peer.requestedMixMsgs { + inv.Hash = mixHash + for pp := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + invs := append(requestQueues[pp], inv) + requestQueues[pp] = invs + pp.requestedMixMsgs[mixHash] = struct{}{} + continue MixHashes + } + // No peers found that have announced this data. + delete(m.requestedMixMsgs, mixHash) + } for pp, requestQueue := range requestQueues { var numRequested int32 gdmsg := wire.NewMsgGetData() @@ -754,6 +795,28 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) { m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } +// handleMixMsg handles mixing messages from all peers. +func (m *SyncManager) handleMixMsg(mmsg *mixMsg) { + peer := mmsg.peer + + mixHash := mmsg.msg.Hash() + + accepted, err := m.cfg.MixPool.AcceptMessage(mmsg.msg) + if err != nil { + log.Errorf("Failed to process %T mixing message %v: %v", + mmsg.msg, &mixHash, err) + return + } + if accepted == nil { + return + } + + delete(peer.requestedMixMsgs, mixHash) + delete(m.requestedMixMsgs, mixHash) + + m.cfg.PeerNotifier.AnnounceMixMessages([]mixing.Message{accepted}) +} + // maybeUpdateIsCurrent potentially updates the manager to signal it believes // the chain is considered synced. // @@ -1275,6 +1338,19 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { return true } +// needMixMsg returns whether or not the mixing message needs to be downloaded. +func (m *SyncManager) needMixMsg(hash *chainhash.Hash) bool { + if m.rejectedMixMsgs.Contains(hash[:]) { + return false + } + + if m.cfg.MixPool.HaveMessage(hash) { + return false + } + + return true +} + // handleInvMsg handles inv messages from all peers. This entails examining the // inventory advertised by the remote peer for block and transaction // announcements and acting accordingly. @@ -1332,6 +1408,29 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns) requestQueue = append(requestQueue, iv) } + + case wire.InvTypeMix: + // Add the mix message to the cache of known inventory + // for the peer. This helps avoid sending mix messages + // to the peer that it is already known to have. + peer.AddKnownInventory(iv) + + // Ignore mixing messages before the chain is current or + // if the messages are not needed. Pair request (PR) + // messages reference unspent outputs that must be + // checked to exist and be unspent before they are + // accepted, and all later messages must reference an + // existing PR recorded in the mixing pool. + if !isCurrent || !m.needMixMsg(&iv.Hash) { + continue + } + + // Request the mixing message if it is not already pending. + if _, exists := m.requestedMixMsgs[iv.Hash]; !exists { + limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs) + requestQueue = append(requestQueue, iv) + } } } @@ -1420,6 +1519,13 @@ out: case <-ctx.Done(): } + case *mixMsg: + m.handleMixMsg(msg) + select { + case msg.reply <- struct{}{}: + case <-ctx.Done(): + } + case *invMsg: m.handleInvMsg(msg) @@ -1538,6 +1644,15 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *Peer) { } } +// QueueMixMsg adds the passed mixing message and peer to the event handling +// queue. +func (m *SyncManager) QueueMixMsg(msg mixing.Message, peer *Peer, done chan struct{}) { + select { + case m.msgChan <- &mixMsg{msg: msg, peer: peer, reply: done}: + case <-m.quit: + } +} + // QueueNotFound adds the passed notfound message and peer to the event handling // queue. func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *Peer) { @@ -1803,6 +1918,10 @@ type Config struct { // and querying the most recently confirmed transactions. It is useful for // preventing duplicate requests. RecentlyConfirmedTxns *apbf.Filter + + // MixPool specifies the mixing pool to use for transient mixing + // messages broadcast across the network. + MixPool *mixpool.Pool } // New returns a new network chain synchronization manager. Use Run to begin @@ -1819,17 +1938,19 @@ func New(config *Config) *SyncManager { } return &SyncManager{ - cfg: *config, - rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peers: make(map[*Peer]struct{}), - minKnownWork: minKnownWork, - hdrSyncState: makeHeaderSyncState(), - progressLogger: progresslog.New("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - quit: make(chan struct{}), - syncHeight: config.Chain.BestSnapshot().Height, - isCurrent: config.Chain.IsCurrent(), + cfg: *config, + rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), + rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedMixMsgs: make(map[chainhash.Hash]struct{}), + peers: make(map[*Peer]struct{}), + minKnownWork: minKnownWork, + hdrSyncState: makeHeaderSyncState(), + progressLogger: progresslog.New("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + quit: make(chan struct{}), + syncHeight: config.Chain.BestSnapshot().Height, + isCurrent: config.Chain.IsCurrent(), } } diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index 17bafa419b..a4273fe0ac 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -19,6 +19,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -130,6 +131,10 @@ type ConnManager interface { // the passed transactions to all connected peers. RelayTransactions(txns []*dcrutil.Tx) + // RelayMixMessages generates and relays inventory vectors for all of + // the passed mixing messages to all connected peers. + RelayMixMessages(msgs []mixing.Message) + // AddedNodeInfo returns information describing persistent (added) nodes. AddedNodeInfo() []Peer @@ -166,6 +171,10 @@ type SyncManager interface { // // This method may report a false positive, but never a false negative. RecentlyConfirmedTxn(hash *chainhash.Hash) bool + + // SubmitMixMessage submits the mixing message to the network after + // processing it locally. + SubmitMixMessage(msg mixing.Message) error } // UtxoEntry represents a utxo entry for use with the RPC server. @@ -675,6 +684,10 @@ type NtfnManager interface { // manager for processing. NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) + // NotifyMixMessage passes a mixing message accepted by the mixpool to the + // notification manager for message broadcasting. + NotifyMixMessage(msg mixing.Message) + // NumClients returns the number of clients actively being served. NumClients() int @@ -726,6 +739,14 @@ type NtfnManager interface { // client when new transaction are added to the memory pool. UnregisterNewMempoolTxsUpdates(wsc *wsClient) + // RegisterMixMessages requests notifications to the passed websocket + // client when new mixing messages are accepted by the mixpool. + RegisterMixMessages(wsc *wsClient) + + // UnregisterMixMessages stops notifications to the websocket client + // client of any newly-accepted mixing messages. + UnregisterMixMessages(wsc *wsClient) + // AddClient adds the passed websocket client to the notification manager. AddClient(wsc *wsClient) diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 37b4c4fe05..1867dc9b72 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -51,6 +51,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -61,7 +62,7 @@ import ( // API version constants const ( jsonrpcSemverMajor = 8 - jsonrpcSemverMinor = 1 + jsonrpcSemverMinor = 2 jsonrpcSemverPatch = 0 ) @@ -5163,6 +5164,14 @@ func (s *Server) NotifyTSpend(tx *dcrutil.Tx) { s.ntfnMgr.NotifyTSpend(tx) } +// NotifyMixMessages notifies websocket clients that have registered to +// receive mixing message notifications of newly accepted mix messages. +func (s *Server) NotifyMixMessages(msgs []mixing.Message) { + for _, msg := range msgs { + s.ntfnMgr.NotifyMixMessage(msg) + } +} + // NotifyNewTickets notifies websocket clients that have registered for maturing // ticket updates. func (s *Server) NotifyNewTickets(tnd *blockchain.TicketNotificationsData) { diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index d228a3ede3..c936f20f7a 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -41,6 +41,7 @@ import ( "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" @@ -531,6 +532,7 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { type testSyncManager struct { isCurrent bool submitBlockErr error + submitMixErr error syncPeerID int32 syncHeight int64 processTransaction []*dcrutil.Tx @@ -550,6 +552,10 @@ func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { return s.submitBlockErr } +func (s *testSyncManager) SubmitMixMessage(msg mixing.Message) error { + return s.submitMixErr +} + // SyncPeer returns a mocked id of the current peer being synced with. func (s *testSyncManager) SyncPeerID() int32 { return s.syncPeerID @@ -870,6 +876,10 @@ func (c *testConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interfa // inventory vectors for all of the passed transactions to all connected peers. func (c *testConnManager) RelayTransactions(txns []*dcrutil.Tx) {} +// RelayMixMessages generates and relays inventory vectors for all of +// the passed mixing messages to all connected peers. +func (c *testConnManager) RelayMixMessages(msgs []mixing.Message) {} + // AddedNodeInfo returns a mocked slice of persistent (added) peers. func (c *testConnManager) AddedNodeInfo() []Peer { return c.addedNodeInfo @@ -1167,6 +1177,10 @@ func (mgr *testNtfnManager) NotifyNewTickets(tnd *blockchain.TicketNotifications // manager for processing. func (mgr *testNtfnManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) {} +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (mgr *testNtfnManager) NotifyMixMessage(msg mixing.Message) {} + // NumClients returns the number of clients actively being served. func (mgr *testNtfnManager) NumClients() int { return mgr.clients @@ -1228,6 +1242,14 @@ func (mgr *testNtfnManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {} // client when new transaction are added to the memory pool. func (mgr *testNtfnManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {} +// RegisterMixMessages requests notifications to the passed websocket +// client when new mixing messages are accepted by the mixpool. +func (mgr *testNtfnManager) RegisterMixMessages(wsc *wsClient) {} + +// UnregisterMixMessages stops notifications to the websocket client +// client of any newly-accepted mixing messages. +func (mgr *testNtfnManager) UnregisterMixMessages(wsc *wsClient) {} + // AddClient adds the passed websocket client to the notification manager. func (mgr *testNtfnManager) AddClient(wsc *wsClient) {} diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go index 24c6dc7f12..0e2094a03d 100644 --- a/internal/rpcserver/rpcserverhelp.go +++ b/internal/rpcserver/rpcserverhelp.go @@ -789,6 +789,14 @@ var helpDescsEnUS = map[string]string{ // StopNotifyNewTransactionsCmd help. "stopnotifynewtransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", + "notifymixmessages--synopsis": "Request notifications for whenever mixing messages are accepted to the mixpool.", + + "stopnotifymixmessages--synopsis": "Cancel registered notifications for whenever mixing messages are accepted to the mixpool.", + + "sendmixmessage--synopsis": "Submit a mixing message to the mixpool and broadcast it to the network and all peers", + "sendmixmessage-message": "Mixing message serialized and encoded as hex", + "sendmixmessage-command": "The wire command name of the message type", + // OutPoint help. "outpoint-hash": "The hex-encoded bytes of the outpoint hash", "outpoint-index": "The index of the outpoint", @@ -1002,13 +1010,16 @@ var rpcResultTypes = map[types.Method][]interface{}{ "notifywork": nil, "notifytspend": nil, "notifynewtransactions": nil, + "notifymixmessages": nil, "rebroadcastwinners": nil, "rescan": {(*types.RescanResult)(nil)}, + "sendmixmessage": nil, "session": {(*types.SessionResult)(nil)}, "stopnotifyblocks": nil, "stopnotifywork": nil, "stopnotifytspend": nil, "stopnotifynewtransactions": nil, + "stopnotifymixmessages": nil, } // helpCacher provides a concurrent safe type that provides help and usage for diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index d36926c8ec..c650b62010 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -14,6 +14,7 @@ import ( "fmt" "io" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -29,6 +30,7 @@ import ( "github.com/decred/dcrd/dcrutil/v4" "github.com/decred/dcrd/internal/blockchain" "github.com/decred/dcrd/internal/mining" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" "github.com/decred/dcrd/txscript/v4/stdscript" @@ -86,6 +88,7 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "notifywinningtickets": handleWinningTickets, "notifynewtickets": handleNewTickets, "notifynewtransactions": handleNotifyNewTransactions, + "notifymixmessages": handleNotifyMixMessages, "rebroadcastwinners": handleRebroadcastWinners, "rescan": handleRescan, "session": handleSession, @@ -93,6 +96,8 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "stopnotifywork": handleStopNotifyWork, "stopnotifytspend": handleStopNotifyTSpend, "stopnotifynewtransactions": handleStopNotifyNewTransactions, + "stopnotifymixmessages": handleStopNotifyMixMessages, + "sendmixmessage": handleSendMixMessage, } // WebsocketHandler handles a new websocket client by creating a new wsClient, @@ -278,6 +283,15 @@ func (m *wsNotificationManager) NotifyMempoolTx(tx *dcrutil.Tx, isNew bool) { } } +// NotifyMixMessage passes a mixing message accepted by the mixpool to the +// notification manager for message broadcasting. +func (m *wsNotificationManager) NotifyMixMessage(msg mixing.Message) { + select { + case m.queueNotification <- (notificationMixMessage)(msg): + case <-m.quit: + } +} + // WinningTicketsNtfnData is the data that is used to generate // winning ticket notifications (which indicate a block and // the tickets eligible to vote on it). @@ -408,6 +422,7 @@ type notificationTxAcceptedByMempool struct { isNew bool tx *dcrutil.Tx } +type notificationMixMessage mixing.Message // Notification control requests type notificationRegisterClient wsClient @@ -424,6 +439,8 @@ type notificationRegisterNewTickets wsClient type notificationUnregisterNewTickets wsClient type notificationRegisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient +type notificationRegisterMixMessages wsClient +type notificationUnregisterMixMessages wsClient // notificationHandler reads notifications and control messages from the queue // handler and processes one at a time. @@ -444,6 +461,7 @@ func (m *wsNotificationManager) notificationHandler(ctx context.Context) { winningTicketNotifications := make(map[chan struct{}]*wsClient) ticketNewNotifications := make(map[chan struct{}]*wsClient) txNotifications := make(map[chan struct{}]*wsClient) + mixNotifications := make(map[chan struct{}]*wsClient) out: for { @@ -489,6 +507,17 @@ out: } m.notifyRelevantTxAccepted(n.tx, clients) + case notificationMixMessage: + m.notifyMixMessage(mixNotifications, (mixing.Message)(n)) + + case *notificationRegisterMixMessages: + wsc := (*wsClient)(n) + mixNotifications[wsc.quit] = wsc + + case *notificationUnregisterMixMessages: + wsc := (*wsClient)(n) + delete(mixNotifications, wsc.quit) + case *notificationRegisterBlocks: wsc := (*wsClient)(n) blockNotifications[wsc.quit] = wsc @@ -1179,6 +1208,56 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx, } } +// RegisterMixMessages requests notifications to the passed websocket +// client when mixing messages are accepted to the mixpool. +func (m *wsNotificationManager) RegisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationRegisterMixMessages)(wsc): + case <-m.quit: + } +} + +// UnregisterMixMessages stops notifications to the websocket client of any +// newly-accepted mixing messages. +func (m *wsNotificationManager) UnregisterMixMessages(wsc *wsClient) { + select { + case m.queueNotification <- (*notificationUnregisterMixMessages)(wsc): + case <-m.quit: + } +} + +// notifyMixMessage notifies all clients subscribed to mixing messages with +// the accepted mixing message. +func (m *wsNotificationManager) notifyMixMessage(clients map[chan struct{}]*wsClient, + msg mixing.Message) { + + // Skip notification creation if no clients have requested mix + // notifications. + if len(clients) == 0 { + return + } + + // Write write message payload in hex encoding. + buf := new(bytes.Buffer) + err := msg.BtcEncode(hex.NewEncoder(buf), wire.MixVersion) + if err != nil { + // Should never error; the message has already been processed + // and accepted. + panic(err) + } + + ntfn := types.NewMixMessageNtfn(msg.Command(), buf.String()) + marshaledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn) + if err != nil { + log.Errorf("Failed to marshal mix message notification: %v", + err) + return + } + for _, client := range clients { + client.QueueNotification(marshaledJSON) + } +} + // AddClient adds the passed websocket client to the notification manager. func (m *wsNotificationManager) AddClient(wsc *wsClient) { select { @@ -2173,6 +2252,69 @@ func handleStopNotifyNewTransactions(_ context.Context, wsc *wsClient, _ interfa return nil, nil } +// handleNotifyMixMessages implements the notifymixmessages command extension +// for websocket connections. +func handleNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.RegisterMixMessages(wsc) + return nil, nil +} + +// handleStopNotifyMixMessages implements the stopnotifymixmessages command +// extension for websocket connections. +func handleStopNotifyMixMessages(_ context.Context, wsc *wsClient, _ interface{}) (interface{}, error) { + wsc.rpcServer.ntfnMgr.UnregisterMixMessages(wsc) + return nil, nil +} + +func handleSendMixMessage(_ context.Context, wsc *wsClient, icmd interface{}) (interface{}, error) { + c := icmd.(*types.SendMixMessageCmd) + + // Allocate a message of the appropriate type based on the wire + // command string. + var msg mixing.Message + switch c.Command { + case wire.CmdMixPR: + msg = new(wire.MsgMixPR) + case wire.CmdMixKE: + msg = new(wire.MsgMixKE) + case wire.CmdMixCT: + msg = new(wire.MsgMixCT) + case wire.CmdMixSR: + msg = new(wire.MsgMixSR) + case wire.CmdMixDC: + msg = new(wire.MsgMixDC) + case wire.CmdMixCM: + msg = new(wire.MsgMixCM) + case wire.CmdMixRS: + msg = new(wire.MsgMixRS) + default: + err := rpcDeserializationError("Unrecognized mixing message "+ + "wire command string %q", c.Command) + return nil, err + } + + // Deserialize message. + err := msg.BtcDecode(hex.NewDecoder(strings.NewReader(c.Message)), + wire.MixVersion) + if err != nil { + return nil, rpcDeserializationError("Could not decode mix "+ + "message: %v", err) + } + + err = wsc.rpcServer.cfg.SyncMgr.SubmitMixMessage(msg) + if err != nil { + // XXX: consider a better error code/function + str := fmt.Sprintf("Rejected mix message: %s", err) + return nil, rpcMiscError(str) + } + + wsc.rpcServer.cfg.ConnMgr.RelayMixMessages([]mixing.Message{msg}) + + wsc.rpcServer.ntfnMgr.NotifyMixMessage(msg) + + return nil, nil +} + // rescanBlock rescans a block for any relevant transactions for the passed // lookup keys. Any discovered transactions are returned hex encoded as a // string slice. diff --git a/log.go b/log.go index eb4be6dacc..cddaa2e3b7 100644 --- a/log.go +++ b/log.go @@ -22,6 +22,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/slog" @@ -68,6 +69,7 @@ var ( feesLog = backendLog.Logger("FEES") indxLog = backendLog.Logger("INDX") minrLog = backendLog.Logger("MINR") + mixpLog = backendLog.Logger("MIXP") peerLog = backendLog.Logger("PEER") rpcsLog = backendLog.Logger("RPCS") scrpLog = backendLog.Logger("SCRP") @@ -89,6 +91,7 @@ func init() { indexers.UseLogger(indxLog) mempool.UseLogger(txmpLog) mining.UseLogger(minrLog) + mixpool.UseLogger(mixpLog) cpuminer.UseLogger(minrLog) peer.UseLogger(peerLog) rpcserver.UseLogger(rpcsLog) @@ -109,6 +112,7 @@ var subsystemLoggers = map[string]slog.Logger{ "FEES": feesLog, "INDX": indxLog, "MINR": minrLog, + "MIXP": mixpLog, "PEER": peerLog, "RPCS": rpcsLog, "SCRP": scrpLog, diff --git a/peer/log.go b/peer/log.go index 90f80e615f..354f5e4322 100644 --- a/peer/log.go +++ b/peer/log.go @@ -68,6 +68,8 @@ func invSummary(invList []*wire.InvVect) string { return fmt.Sprintf("tx %s", iv.Hash) case wire.InvTypeFilteredBlock: return fmt.Sprintf("filtered block %s", iv.Hash) + case wire.InvTypeMix: + return fmt.Sprintf("mix %s", iv.Hash) } return fmt.Sprintf("unknown (%d) %s", uint32(iv.Type), iv.Hash) @@ -161,8 +163,10 @@ func messageSummary(msg wire.Message) string { return fmt.Sprintf("types %v", msg.Types) case *wire.MsgInitState: - return fmt.Sprintf("blocks %d, votes %d, treasury spends %d", - len(msg.BlockHashes), len(msg.VoteHashes), len(msg.TSpendHashes)) + return fmt.Sprintf("blocks %d, votes %d, treasury spends %d, "+ + "mix PRs %d", + len(msg.BlockHashes), len(msg.VoteHashes), + len(msg.TSpendHashes), len(msg.MixPRHashes)) } // No summary for other messages. diff --git a/peer/peer.go b/peer/peer.go index 57d793d933..ba60c54350 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -196,6 +196,27 @@ type MessageListeners struct { // OnInitState is invoked when a peer receives an initstate message. OnInitState func(p *Peer, msg *wire.MsgInitState) + // OnMixPR is invoked when a peer receives a mixpr message. + OnMixPR func(p *Peer, msg *wire.MsgMixPR) + + // OnMixKE is invoked when a peer receives a mixke message. + OnMixKE func(p *Peer, msg *wire.MsgMixKE) + + // OnMixCT is invoked when a peer receives a mixct message. + OnMixCT func(p *Peer, msg *wire.MsgMixCT) + + // OnMixSR is invoked when a peer receives a mixsr message. + OnMixSR func(p *Peer, msg *wire.MsgMixSR) + + // OnMixDC is invoked when a peer receives a mixdc message. + OnMixDC func(p *Peer, msg *wire.MsgMixDC) + + // OnMixCM is invoked when a peer receives a mixcm message. + OnMixCM func(p *Peer, msg *wire.MsgMixCM) + + // OnMixRS is invoked when a peer receives a mixrs message. + OnMixRS func(p *Peer, msg *wire.MsgMixRS) + // OnRead is invoked when a peer receives a wire message. It consists // of the number of bytes read, the message, and whether or not an error // in the read occurred. Typically, callers will opt to use the @@ -1059,9 +1080,16 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st addedDeadline = true case wire.CmdGetData: - // Expects a block, tx, or notfound message. + // Expects a block, tx, mix, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdMixPR] = deadline + pendingResponses[wire.CmdMixKE] = deadline + pendingResponses[wire.CmdMixCT] = deadline + pendingResponses[wire.CmdMixSR] = deadline + pendingResponses[wire.CmdMixDC] = deadline + pendingResponses[wire.CmdMixCM] = deadline + pendingResponses[wire.CmdMixRS] = deadline pendingResponses[wire.CmdNotFound] = deadline addedDeadline = true @@ -1125,9 +1153,30 @@ out: fallthrough case wire.CmdTx: fallthrough + case wire.CmdMixPR: + fallthrough + case wire.CmdMixKE: + fallthrough + case wire.CmdMixCT: + fallthrough + case wire.CmdMixSR: + fallthrough + case wire.CmdMixDC: + fallthrough + case wire.CmdMixCM: + fallthrough + case wire.CmdMixRS: + fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdMixPR) + delete(pendingResponses, wire.CmdMixKE) + delete(pendingResponses, wire.CmdMixCT) + delete(pendingResponses, wire.CmdMixSR) + delete(pendingResponses, wire.CmdMixDC) + delete(pendingResponses, wire.CmdMixCM) + delete(pendingResponses, wire.CmdMixRS) delete(pendingResponses, wire.CmdNotFound) default: @@ -1428,6 +1477,41 @@ out: p.cfg.Listeners.OnInitState(p, msg) } + case *wire.MsgMixPR: + if p.cfg.Listeners.OnMixPR != nil { + p.cfg.Listeners.OnMixPR(p, msg) + } + + case *wire.MsgMixKE: + if p.cfg.Listeners.OnMixKE != nil { + p.cfg.Listeners.OnMixKE(p, msg) + } + + case *wire.MsgMixCT: + if p.cfg.Listeners.OnMixCT != nil { + p.cfg.Listeners.OnMixCT(p, msg) + } + + case *wire.MsgMixSR: + if p.cfg.Listeners.OnMixSR != nil { + p.cfg.Listeners.OnMixSR(p, msg) + } + + case *wire.MsgMixDC: + if p.cfg.Listeners.OnMixDC != nil { + p.cfg.Listeners.OnMixDC(p, msg) + } + + case *wire.MsgMixCM: + if p.cfg.Listeners.OnMixCM != nil { + p.cfg.Listeners.OnMixCM(p, msg) + } + + case *wire.MsgMixRS: + if p.cfg.Listeners.OnMixRS != nil { + p.cfg.Listeners.OnMixRS(p, msg) + } + default: log.Debugf("Received unhandled message of type %v "+ "from %v", rmsg.Command(), p) diff --git a/rpc/jsonrpc/types/chainsvrwscmds.go b/rpc/jsonrpc/types/chainsvrwscmds.go index c1998337f7..31919e606c 100644 --- a/rpc/jsonrpc/types/chainsvrwscmds.go +++ b/rpc/jsonrpc/types/chainsvrwscmds.go @@ -153,6 +153,24 @@ func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd { } } +// NotifyMixMessagesCmd defines the notifymixmessages JSON-RPC command. +type NotifyMixMessagesCmd struct{} + +// NewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// notifymixmessages JSON-RPC command. +func NewNotifyMixMessagesCmd() *NotifyMixMessagesCmd { + return &NotifyMixMessagesCmd{} +} + +// StopNotifyMixMessagesCmd defines the stopnotifymixmessages JSON-RPC command. +type StopNotifyMixMessagesCmd struct{} + +// StopNewNotifyMixMessagesCmd returns a new instance which can be used to issue a +// stopnotifymixmessages JSON-RPC command. +func StopNewNotifyMixMessagesCmd() *StopNotifyMixMessagesCmd { + return &StopNotifyMixMessagesCmd{} +} + // SessionCmd defines the session JSON-RPC command. type SessionCmd struct{} @@ -185,6 +203,21 @@ func NewRescanCmd(blockHashes []string) *RescanCmd { return &RescanCmd{BlockHashes: blockHashes} } +// SendMixMessage defines the sendmixmessage JSON-RPC command. +type SendMixMessageCmd struct { + Command string + Message string +} + +// NewSendMixMessageCmd returns a new instance which can be used to issue a +// sendmixmessage JSON-RPC command. +func NewSendMixMessageCmd(command, message string) *SendMixMessageCmd { + return &SendMixMessageCmd{ + Command: command, + Message: message, + } +} + func init() { // The commands in this file are only usable by websockets. flags := dcrjson.UFWebsocketOnly @@ -197,11 +230,14 @@ func init() { dcrjson.MustRegister(Method("notifynewtransactions"), (*NotifyNewTransactionsCmd)(nil), flags) dcrjson.MustRegister(Method("notifynewtickets"), (*NotifyNewTicketsCmd)(nil), flags) dcrjson.MustRegister(Method("notifywinningtickets"), (*NotifyWinningTicketsCmd)(nil), flags) + dcrjson.MustRegister(Method("notifymixmessages"), (*NotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rebroadcastwinners"), (*RebroadcastWinnersCmd)(nil), flags) dcrjson.MustRegister(Method("session"), (*SessionCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifyblocks"), (*StopNotifyBlocksCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifywork"), (*StopNotifyWorkCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifytspend"), (*StopNotifyTSpendCmd)(nil), flags) dcrjson.MustRegister(Method("stopnotifynewtransactions"), (*StopNotifyNewTransactionsCmd)(nil), flags) + dcrjson.MustRegister(Method("stopnotifymixmessages"), (*StopNotifyMixMessagesCmd)(nil), flags) dcrjson.MustRegister(Method("rescan"), (*RescanCmd)(nil), flags) + dcrjson.MustRegister(Method("sendmixmessage"), (*SendMixMessageCmd)(nil), flags) } diff --git a/rpc/jsonrpc/types/chainsvrwsntfns.go b/rpc/jsonrpc/types/chainsvrwsntfns.go index 5942a86b54..2f563229f5 100644 --- a/rpc/jsonrpc/types/chainsvrwsntfns.go +++ b/rpc/jsonrpc/types/chainsvrwsntfns.go @@ -52,6 +52,9 @@ const ( // WinningTicketsNtfnMethod is the method of the daemon winningtickets // notification. WinningTicketsNtfnMethod Method = "winningtickets" + + // MixMessageNtfnMethod is the method of the mixmessage notification. + MixMessageNtfnMethod Method = "mixmessage" ) // BlockConnectedNtfn defines the blockconnected JSON-RPC notification. @@ -207,6 +210,20 @@ func NewWinningTicketsNtfn(hash string, height int32, tickets map[string]string) } } +// MixMessageNtfn defines the mixmessage JSON-RPC notification. +type MixMessageNtfn struct { + Command string `json:"command"` + Payload string `json:"payload"` +} + +// NewMixMessageNtfn returns a new instance which can be used to issue a +// mixmessage JSON-RPC notification. +func NewMixMessageNtfn(command, payload string) *MixMessageNtfn { + return &MixMessageNtfn{ + Command: command, + Payload: payload, + } +} func init() { // The commands in this file are only usable by websockets and are // notifications. @@ -222,4 +239,5 @@ func init() { dcrjson.MustRegister(TxAcceptedVerboseNtfnMethod, (*TxAcceptedVerboseNtfn)(nil), flags) dcrjson.MustRegister(RelevantTxAcceptedNtfnMethod, (*RelevantTxAcceptedNtfn)(nil), flags) dcrjson.MustRegister(WinningTicketsNtfnMethod, (*WinningTicketsNtfn)(nil), flags) + dcrjson.MustRegister(MixMessageNtfnMethod, (*MixMessageNtfn)(nil), flags) } diff --git a/rpcadaptors.go b/rpcadaptors.go index 643f9f802e..d45f57fdbb 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -282,6 +283,15 @@ func (cm *rpcConnManager) RelayTransactions(txns []*dcrutil.Tx) { cm.server.relayTransactions(txns) } +// RelayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserver.ConnManager interface implementation. +func (cm *rpcConnManager) RelayMixMessages(msgs []mixing.Message) { + cm.server.relayMixMessages(msgs) +} + // AddedNodeInfo returns information describing persistent (added) nodes. // // This function is safe for concurrent access and is part of the @@ -364,6 +374,12 @@ func (b *rpcSyncMgr) RecentlyConfirmedTxn(hash *chainhash.Hash) bool { return b.server.recentlyConfirmedTxns.Contains(hash[:]) } +// SubmitMixMessage locally processes the mixing message. +func (b *rpcSyncMgr) SubmitMixMessage(msg mixing.Message) error { + _, err := b.server.mixMsgPool.AcceptMessage(msg) + return err +} + // rpcUtxoEntry represents a utxo entry for use with the RPC server and // implements the rpcserver.UtxoEntry interface. type rpcUtxoEntry struct { diff --git a/server.go b/server.go index af8c05079f..c4914ed2fd 100644 --- a/server.go +++ b/server.go @@ -45,6 +45,8 @@ import ( "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/wire" @@ -74,7 +76,7 @@ const ( connectionRetryInterval = time.Second * 5 // maxProtocolVersion is the max protocol version the server supports. - maxProtocolVersion = wire.RemoveRejectVersion + maxProtocolVersion = wire.MixVersion // These fields are used to track known addresses on a per-peer basis. // @@ -512,6 +514,7 @@ type server struct { txMemPool *mempool.TxPool feeEstimator *fees.Estimator cpuMiner *cpuminer.CPUMiner + mixMsgPool *mixpool.Pool modifyRebroadcastInv chan interface{} newPeers chan *serverPeer donePeers chan *serverPeer @@ -571,8 +574,9 @@ type serverPeer struct { // The following fields are used to synchronize the net sync manager and // server. - txProcessed chan struct{} - blockProcessed chan struct{} + txProcessed chan struct{} + blockProcessed chan struct{} + mixMsgProcessed chan struct{} // peerNa is network address of the peer connected to. peerNa *wire.NetAddress @@ -598,13 +602,14 @@ type serverPeer struct { // the caller. func newServerPeer(s *server, isPersistent bool) *serverPeer { return &serverPeer{ - server: s, - persistent: isPersistent, - knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), - quit: make(chan struct{}), - txProcessed: make(chan struct{}, 1), - blockProcessed: make(chan struct{}, 1), - getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), + server: s, + persistent: isPersistent, + knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate), + quit: make(chan struct{}), + txProcessed: make(chan struct{}, 1), + blockProcessed: make(chan struct{}, 1), + mixMsgProcessed: make(chan struct{}, 1), + getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), } } @@ -668,6 +673,16 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, continueHash := sp.continueHash.Load() sendInv = continueHash != nil && *continueHash == *blockHash + case wire.InvTypeMix: + mixHash := &iv.Hash + msg, err := sp.server.mixMsgPool.Message(mixHash) + if err != nil { + peerLog.Tracef("Unable to fetch requested mix message %v: %v", + mixHash, err) + break + } + dataMsg = msg + default: peerLog.Warnf("Unknown type '%d' in inventory request from %s", iv.Type, sp) @@ -1199,6 +1214,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { // Response data. var blockHashes, voteHashes, tspendHashes []chainhash.Hash + var mixPRHashes []chainhash.Hash // Map from the types slice into a map for easier checking. types := make(map[string]struct{}, len(msg.Types)) @@ -1208,6 +1224,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { _, wantBlocks := types[wire.InitStateHeadBlocks] _, wantVotes := types[wire.InitStateHeadBlockVotes] _, wantTSpends := types[wire.InitStateTSpends] + _, wantMixPRs := types[wire.InitStateMixPRs] // Fetch head block hashes if we need to send either them or their // votes. @@ -1243,6 +1260,12 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { tspendHashes = mp.TSpendHashes() } + // Construct mixprs to send. + if wantMixPRs { + mixpool := sp.server.mixMsgPool + mixPRHashes = mixpool.MixPRHashes() + } + // Clear out block hashes to be sent if they weren't requested. if !wantBlocks { blockHashes = nil @@ -1254,6 +1277,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { peerLog.Warnf("Unexpected error while building initstate msg: %v", err) return } + initMsg.MixPRHashes = mixPRHashes sp.QueueMessage(initMsg, nil) } @@ -1341,6 +1365,12 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { sp.Disconnect() return } + if invVect.Type == wire.InvTypeMix { + peerLog.Infof("Peer %v is announcing mix messages -- disconnecting", + sp) + sp.Disconnect() + return + } err := newInv.AddInvVect(invVect) if err != nil { peerLog.Errorf("Failed to add inventory vector: %v", err) @@ -1628,6 +1658,61 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { sp.server.addrManager.AddAddresses(addrList, remoteAddr) } +// onMixMessage is the generic handler for all mix messages handler callbacks. +func (sp *serverPeer) onMixMessage(msg mixing.Message) { + if cfg.BlocksOnly { + peerLog.Tracef("Ignoring mix message %v from %v - blocksonly "+ + "enabled", msg.Hash(), sp) + return + } + + // TODO - add ban score increases + + // Add the message to the known inventory for the peer. + hash := msg.Hash() + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + sp.AddKnownInventory(iv) + + // Queue the message to be handled by the net sync manager + sp.server.syncManager.QueueMixMsg(msg, sp.syncMgrPeer, sp.mixMsgProcessed) + <-sp.mixMsgProcessed +} + +// OnMixPR submits a received mixing pair request message to the mixpool. +func (sp *serverPeer) OnMixPR(_ *peer.Peer, msg *wire.MsgMixPR) { + sp.onMixMessage(msg) +} + +// OnMixK submits a received mixing key exchange message to the mixpool. +func (sp *serverPeer) OnMixKE(_ *peer.Peer, msg *wire.MsgMixKE) { + sp.onMixMessage(msg) +} + +// OnMixCT submits a received mixing ciphertext exchange message to the mixpool. +func (sp *serverPeer) OnMixCT(_ *peer.Peer, msg *wire.MsgMixCT) { + sp.onMixMessage(msg) +} + +// OnMixSR submits a received mixing slot reservation message to the mixpool. +func (sp *serverPeer) OnMixSR(_ *peer.Peer, msg *wire.MsgMixSR) { + sp.onMixMessage(msg) +} + +// OnMixDC submits a received mixing XOR DC-net message to the mixpool. +func (sp *serverPeer) OnMixDC(_ *peer.Peer, msg *wire.MsgMixDC) { + sp.onMixMessage(msg) +} + +// OnMixCM submits a received mixing confirmation message to the mixpool. +func (sp *serverPeer) OnMixCM(_ *peer.Peer, msg *wire.MsgMixCM) { + sp.onMixMessage(msg) +} + +// OnMixRS submits a received mixing reveal secrets message to the mixpool. +func (sp *serverPeer) OnMixRS(_ *peer.Peer, msg *wire.MsgMixRS) { + sp.onMixMessage(msg) +} + // OnRead is invoked when a peer receives a message and it is used to update // the bytes received by the server. func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { @@ -1767,6 +1852,16 @@ func (s *server) relayTransactions(txns []*dcrutil.Tx) { } } +// relayMixMessages generates and relays inventory vectors for all of the +// passed mixing messages to all connected peers. +func (s *server) relayMixMessages(msgs []mixing.Message) { + for _, m := range msgs { + hash := m.Hash() + iv := wire.NewInvVect(wire.InvTypeMix, &hash) + s.RelayInventory(iv, m, false) + } +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // websocket clients of the passed transactions. This function should be // called whenever new transactions are added to the mempool. @@ -1781,6 +1876,19 @@ func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) { } } +// AnnounceMixMessages generates and relays inventory vectors of the passed +// mixing messages. This function should be called whenever new messages are +// accepted to the mixpool. +func (s *server) AnnounceMixMessages(msgs []mixing.Message) { + // Generate and relay inventory vectors for all newly accepted mixing + // messages. + s.relayMixMessages(msgs) + + if s.rpcServer != nil { + s.rpcServer.NotifyMixMessages(msgs) + } +} + // TransactionConfirmed marks the provided single confirmation transaction as // no longer needing rebroadcasting and keeps track of it for use when avoiding // requests for recently confirmed transactions. @@ -2054,6 +2162,14 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { } } + if iv.Type == wire.InvTypeMix { + // Don't relay mix message inventory when unsupported + // by the negotiated protocol version. + if sp.ProtocolVersion() < wire.MixVersion { + return + } + } + // Either queue the inventory to be relayed immediately or with // the next batch depending on the immediate flag. // @@ -2311,6 +2427,13 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnInitState: sp.OnInitState, OnTx: sp.OnTx, OnBlock: sp.OnBlock, + OnMixPR: sp.OnMixPR, + OnMixKE: sp.OnMixKE, + OnMixCT: sp.OnMixCT, + OnMixSR: sp.OnMixSR, + OnMixDC: sp.OnMixDC, + OnMixCM: sp.OnMixCM, + OnMixRS: sp.OnMixRS, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders, OnGetData: sp.OnGetData, @@ -3504,6 +3627,26 @@ func (c *reloadableTLSConfig) configFileClient(_ *tls.ClientHelloInfo) (*tls.Con return c.cachedConfig, nil } +// mixpoolChain adapts the internal blockchain type with a FetchUtxoEntry +// method that is compatible with the mixpool package. +type mixpoolChain struct { + *blockchain.BlockChain +} + +var _ mixpool.BlockChain = (*mixpoolChain)(nil) +var _ mixpool.UtxoEntry = (*blockchain.UtxoEntry)(nil) + +func (m *mixpoolChain) FetchUtxoEntry(op wire.OutPoint) (mixpool.UtxoEntry, error) { + entry, err := m.BlockChain.FetchUtxoEntry(op) + if err != nil { + return nil, err + } + if entry == nil { + return nil, err + } + return entry, nil +} + // makeReloadableTLSConfig returns a TLS configuration that will dynamically // reload the server certificate, server key, and client CAs from the configured // paths when the files are updated. @@ -3850,6 +3993,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, } s.txMemPool = mempool.New(&txC) + mixchain := &mixpoolChain{s.chain} + s.mixMsgPool = mixpool.NewPool(mixchain) + s.syncManager = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, @@ -3860,6 +4006,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, MaxPeers: cfg.MaxPeers, MaxOrphanTxs: cfg.MaxOrphanTxs, RecentlyConfirmedTxns: s.recentlyConfirmedTxns, + MixPool: s.mixMsgPool, }) // Dump the blockchain and quit if requested.