From 9477295c04fafcac8dd10d5aaf41811b0fac2165 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 15 May 2024 15:47:17 +0000 Subject: [PATCH] mixing: Add mixpool package. The mixpool package implements a memory pool of recently observed mix messages. Similar to the transaction mempool, the mixpool allows these messages to be temporarily stored in memory to be relayed through the P2P network. It handles message acceptance, expiry, UTXO ownership proof checks, and that previously referenced messages have also been accepted to the mixpool. The mixpool is designed with both full-node and wallet usage in mind, providing all of these same acceptance rules to mixing wallets with the exception of UTXO proof checks. For wallets, it also implements query functions for messages matching compatible pairings and messages belong to ongoing sessions. --- mixing/go.mod | 11 + mixing/go.sum | 38 +- mixing/mixpool/errors.go | 87 ++ mixing/mixpool/log.go | 22 + mixing/mixpool/mixpool.go | 1550 ++++++++++++++++++++++++++++++++ mixing/mixpool/mixpool_test.go | 408 +++++++++ mixing/mixpool/orphans_test.go | 262 ++++++ mixing/utxoproof/utxoproof.go | 90 ++ 8 files changed, 2465 insertions(+), 3 deletions(-) create mode 100644 mixing/mixpool/errors.go create mode 100644 mixing/mixpool/log.go create mode 100644 mixing/mixpool/mixpool.go create mode 100644 mixing/mixpool/mixpool_test.go create mode 100644 mixing/mixpool/orphans_test.go create mode 100644 mixing/utxoproof/utxoproof.go diff --git a/mixing/go.mod b/mixing/go.mod index f7624ddb3f..128b83241b 100644 --- a/mixing/go.mod +++ b/mixing/go.mod @@ -3,16 +3,27 @@ module github.com/decred/dcrd/mixing go 1.17 require ( + decred.org/cspp/v2 v2.2.0 github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a + github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/chaincfg/chainhash v1.0.4 github.com/decred/dcrd/chaincfg/v3 v3.2.0 github.com/decred/dcrd/crypto/blake256 v1.0.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 + github.com/decred/dcrd/dcrutil/v4 v4.0.1 + github.com/decred/dcrd/txscript/v4 v4.1.0 github.com/decred/dcrd/wire v1.7.0 + github.com/decred/slog v1.2.0 golang.org/x/crypto v0.23.0 ) require ( + github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect + github.com/dchest/siphash v1.2.3 // indirect + github.com/decred/base58 v1.0.5 // indirect + github.com/decred/dcrd/crypto/ripemd160 v1.0.2 // indirect + github.com/decred/dcrd/dcrec v1.0.1 // indirect + github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect golang.org/x/sys v0.20.0 // indirect lukechampine.com/blake3 v1.3.0 // indirect diff --git a/mixing/go.sum b/mixing/go.sum index e95c41b45f..5a81ac80a5 100644 --- a/mixing/go.sum +++ b/mixing/go.sum @@ -1,27 +1,48 @@ +decred.org/cspp/v2 v2.2.0 h1:VSOUC1w0Wo+QOGS0r1XO6TLnO16X67KuvpDmRRYyr08= +decred.org/cspp/v2 v2.2.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA= +github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= +github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0= github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= +github.com/decred/base58 v1.0.5 h1:hwcieUM3pfPnE/6p3J100zoRfGkQxBulZHo7GZfOqic= +github.com/decred/base58 v1.0.5/go.mod h1:s/8lukEHFA6bUQQb/v3rjUySJ2hu+RioCzLukAVkrfw= github.com/decred/dcrd/chaincfg/chainhash v1.0.4 h1:zRCv6tdncLfLTKYqu7hrXvs7hW+8FO/NvwoFvGsrluU= github.com/decred/dcrd/chaincfg/chainhash v1.0.4/go.mod h1:hA86XxlBWwHivMvxzXTSD0ZCG/LoYsFdWnCekkTMCqY= github.com/decred/dcrd/chaincfg/v3 v3.2.0 h1:6WxA92AGBkycEuWvxtZMvA76FbzbkDRoK8OGbsR2muk= github.com/decred/dcrd/chaincfg/v3 v3.2.0/go.mod h1:2rHW1TKyFmwZTVBLoU/Cmf0oxcpBjUEegbSlBfrsriI= github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= +github.com/decred/dcrd/crypto/ripemd160 v1.0.2 h1:TvGTmUBHDU75OHro9ojPLK+Yv7gDl2hnUvRocRCjsys= +github.com/decred/dcrd/crypto/ripemd160 v1.0.2/go.mod h1:uGfjDyePSpa75cSQLzNdVmWlbQMBuiJkvXw/MNKRY4M= +github.com/decred/dcrd/dcrec v1.0.1 h1:gDzlndw0zYxM5BlaV17d7ZJV6vhRe9njPBFeg4Db2UY= +github.com/decred/dcrd/dcrec v1.0.1/go.mod h1:CO+EJd8eHFb8WHa84C7ZBkXsNUIywaTHb+UAuI5uo6o= +github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 h1:l/lhv2aJCUignzls81+wvga0TFlyoZx8QxRMQgXpZik= +github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3/go.mod h1:AKpV6+wZ2MfPRJnTbQ6NPgWrKzbe9RCIlCF/FKzMtM8= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/decred/dcrd/dcrutil/v4 v4.0.1 h1:E+d2TNbpOj0f1L9RqkZkEm1QolFjajvkzxWC5WOPf1s= +github.com/decred/dcrd/dcrutil/v4 v4.0.1/go.mod h1:7EXyHYj8FEqY+WzMuRkF0nh32ueLqhutZDoW4eQ+KRc= +github.com/decred/dcrd/txscript/v4 v4.1.0 h1:uEdcibIOl6BuWj3AqmXZ9xIK/qbo6lHY9aNk29FtkrU= +github.com/decred/dcrd/txscript/v4 v4.1.0/go.mod h1:OVguPtPc4YMkgssxzP8B6XEMf/J3MB6S1JKpxgGQqi0= github.com/decred/dcrd/wire v1.6.0/go.mod h1:XQ8Xv/pN/3xaDcb7sH8FBLS9cdgVctT7HpBKKGsIACk= github.com/decred/dcrd/wire v1.7.0 h1:5JHiDjEQeS4XUl4PfnTZYLwAD/E/+LwBmPRec/fP76o= github.com/decred/dcrd/wire v1.7.0/go.mod h1:lAqrzV0SU4kyV6INLEJgDtUjJaTaVKrbF4LHtaYl+zU= +github.com/decred/slog v1.2.0 h1:soHAxV52B54Di3WtKLfPum9OFfWqwtf/ygf9njdfnPM= +github.com/decred/slog v1.2.0/go.mod h1:kVXlGnt6DHy2fV5OjSeuvCJ0OmlmTF6LFpEPMu/fOY0= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jrick/wsrpc/v2 v2.3.5/go.mod h1:7oBeDM/xMF6Yqy4GDAjpppuOf1hm6lWsaG3EaMrm+aA= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -31,6 +52,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -38,20 +61,29 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/mixing/mixpool/errors.go b/mixing/mixpool/errors.go new file mode 100644 index 0000000000..8beae56641 --- /dev/null +++ b/mixing/mixpool/errors.go @@ -0,0 +1,87 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "errors" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +// RuleError represents a mixpool rule violation. +// +// RuleErrors can be treated as a bannable offense. +type RuleError struct { + Err error +} + +func ruleError(err error) *RuleError { + return &RuleError{Err: err} +} + +func (e *RuleError) Error() string { + return e.Err.Error() +} + +func (e *RuleError) Unwrap() error { + return e.Err +} + +// Errors wrapped by RuleError. +var ( + // ErrChangeDust is returned by AcceptMessage if a pair request's + // change amount is dust. + ErrChangeDust = errors.New("change output is dust") + + // ErrInvalidMessageCount is returned by AcceptMessage if a + // pair request contains an invalid message count. + ErrInvalidMessageCount = errors.New("message count must be positive") + + // ErrInvalidScript is returned by AcceptMessage if a pair request + // contains an invalid script. + ErrInvalidScript = errors.New("invalid script") + + // ErrInvalidSessionID is returned by AcceptMessage if the message + // contains an invalid session id. + ErrInvalidSessionID = errors.New("invalid session ID") + + // ErrInvalidSignature is returned by AcceptMessage if the message is + // not properly signed for the claimed identity. + ErrInvalidSignature = errors.New("invalid message signature") + + // ErrInvalidTotalMixAmount is returned by AcceptMessage if a pair + // request contains the product of the message count and mix amount + // that exceeds the total input value. + ErrInvalidTotalMixAmount = errors.New("invalid total mix amount") + + // ErrInvalidUTXOProof is returned by AcceptMessage if a pair request + // fails to prove ownership of each utxo. + ErrInvalidUTXOProof = errors.New("invalid UTXO ownership proof") + + // ErrMissingUTXOs is returned by AcceptMessage if a pair request + // message does not reference any previous UTXOs. + ErrMissingUTXOs = errors.New("pair request contains no UTXOs") +) + +var ( + // ErrSecretsRevealed is returned by Receive if any peer has + // unexpectedly revealed their secrets during a run stage + // (Received.RSs field is nil). This requires all other peers to quit + // the run, reveal their secrets, and perform blame assignment. + ErrSecretsRevealed = errors.New("secrets revealed by peer") +) + +// MissingOwnPRError represents the error condition where a key exchange +// message cannot be accepted to the mixpool due to it not referencing the +// owner's own pair request. The KE is recorded as an orphan and may be +// processed later. The error contains all unknown PR hashes so they can be +// fetched from another instance. +type MissingOwnPRError struct { + MissingPR chainhash.Hash +} + +func (e *MissingOwnPRError) Error() string { + return "KE identity's own PR is missing from mixpool" +} diff --git a/mixing/mixpool/log.go b/mixing/mixpool/log.go new file mode 100644 index 0000000000..22d6482844 --- /dev/null +++ b/mixing/mixpool/log.go @@ -0,0 +1,22 @@ +// Copyright (c) 2020-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "github.com/decred/slog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +// The default amount of logging is none. +var log = slog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using slog. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/mixing/mixpool/mixpool.go b/mixing/mixpool/mixpool.go new file mode 100644 index 0000000000..625109f27c --- /dev/null +++ b/mixing/mixpool/mixpool.go @@ -0,0 +1,1550 @@ +// Copyright (c) 2023-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +// Package mixpool provides an in-memory pool of mixing messages for full nodes +// that relay these messages and mixing wallets that send and receive them. +package mixpool + +import ( + "bytes" + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/utxoproof" + "github.com/decred/dcrd/txscript/v4" + "github.com/decred/dcrd/txscript/v4/stdscript" + "github.com/decred/dcrd/wire" +) + +const minconf = 2 +const feeRate = 0.0001e8 + +type idPubKey = [33]byte + +type msgtype int + +// Message type constants, for quickly checking looked up entries by message +// hash match the expected type (without performing a type assertion). +// Excludes PR. +const ( + msgtypeKE msgtype = 1 + iota + msgtypeCT + msgtypeSR + msgtypeDC + msgtypeCM + msgtypeFP + msgtypeRS + + nmsgtypes = msgtypeRS +) + +func (m msgtype) String() string { + switch m { + case msgtypeKE: + return "KE" + case msgtypeCT: + return "CT" + case msgtypeSR: + return "SR" + case msgtypeDC: + return "DC" + case msgtypeCM: + return "CM" + case msgtypeFP: + return "FP" + case msgtypeRS: + return "RS" + default: + return "?" + } +} + +// entry describes non-PR messages accepted to the pool. +type entry struct { + hash chainhash.Hash + sid [32]byte + recvTime time.Time + msg mixing.Message + msgtype msgtype + run uint32 +} + +type orphan struct { + message mixing.Message + accepted time.Time +} + +type session struct { + sid [32]byte + runs []runstate + expiry uint32 + bc broadcast +} + +type runstate struct { + prs []chainhash.Hash + counts [nmsgtypes]uint32 + hashes map[chainhash.Hash]struct{} +} + +func (r *runstate) countFor(t msgtype) uint32 { + return r.counts[t-1] +} + +func (r *runstate) incrementCountFor(t msgtype) { + r.counts[t-1]++ +} + +type broadcast struct { + ch chan struct{} + mu sync.Mutex +} + +// wait returns the wait channel that is closed whenever a message is received +// for a session. Waiters must acquire the pool lock before reading messages. +func (b *broadcast) wait() <-chan struct{} { + b.mu.Lock() + ch := b.ch + b.mu.Unlock() + + return ch +} + +func (b *broadcast) signal() { + b.mu.Lock() + close(b.ch) + b.ch = make(chan struct{}) + b.mu.Unlock() +} + +// Pool records in-memory mix messages that have been broadcast over the +// peer-to-peer network. +type Pool struct { + mtx sync.RWMutex + prs map[chainhash.Hash]*wire.MsgMixPairReq + outPoints map[wire.OutPoint]chainhash.Hash + pool map[chainhash.Hash]entry + orphans map[chainhash.Hash]*orphan + orphansByID map[idPubKey]map[chainhash.Hash]mixing.Message + messagesByIdentity map[idPubKey][]chainhash.Hash + latestKE map[idPubKey]*wire.MsgMixKeyExchange + sessions map[[32]byte]*session + sessionsByTxHash map[chainhash.Hash]*session + epoch time.Duration + expireHeight uint32 + expireSem chan struct{} + + blockchain BlockChain + utxoFetcher UtxoFetcher + feeRate int64 + params *chaincfg.Params +} + +// UtxoEntry provides details regarding unspent transaction outputs. +type UtxoEntry interface { + IsSpent() bool + PkScript() []byte + ScriptVersion() uint16 + BlockHeight() int64 + Amount() int64 +} + +// UtxoFetcher defines methods used to validate unspent transaction outputs in +// the pair request message. It is optional, but should be implemented by full +// nodes that have this capability to detect and stop relay of spam and junk +// messages. +type UtxoFetcher interface { + // FetchUtxoEntry defines the function to use to fetch unspent + // transaction output information. + FetchUtxoEntry(wire.OutPoint) (UtxoEntry, error) +} + +// BlockChain queries the current status of the blockchain. Its methods should +// be able to be implemented by both full nodes and SPV wallets. +type BlockChain interface { + // ChainParams identifies which chain parameters the mixing pool is + // associated with. + ChainParams() *chaincfg.Params + + // CurrentTip returns the hash and height of the current tip block. + CurrentTip() (chainhash.Hash, int64) +} + +// NewPool returns a new mixing pool that accepts and validates mixing messages +// required for distributed transaction mixing. +func NewPool(blockchain BlockChain) *Pool { + pool := &Pool{ + prs: make(map[chainhash.Hash]*wire.MsgMixPairReq), + outPoints: make(map[wire.OutPoint]chainhash.Hash), + pool: make(map[chainhash.Hash]entry), + orphans: make(map[chainhash.Hash]*orphan), + orphansByID: make(map[idPubKey]map[chainhash.Hash]mixing.Message), + messagesByIdentity: make(map[idPubKey][]chainhash.Hash), + latestKE: make(map[idPubKey]*wire.MsgMixKeyExchange), + sessions: make(map[[32]byte]*session), + sessionsByTxHash: make(map[chainhash.Hash]*session), + epoch: 10 * time.Minute, // XXX: mainnet epoch: add to chainparams + expireHeight: 0, + expireSem: make(chan struct{}, 1), + blockchain: blockchain, + feeRate: feeRate, + params: blockchain.ChainParams(), + } + // XXX: add epoch to chainparams + if blockchain.ChainParams().Net == wire.TestNet3 { + pool.epoch = 3 * time.Minute + } + + if u, ok := blockchain.(UtxoFetcher); ok { + pool.utxoFetcher = u + } + return pool +} + +// Epoch returns the duration between mix epochs. +func (p *Pool) Epoch() time.Duration { + return p.epoch +} + +// MixPRHashes returns the hashes of all MixPR messages recorded by the pool. +// This data is provided to peers requesting initial state of the mixpool. +func (p *Pool) MixPRHashes() []chainhash.Hash { + p.mtx.RLock() + hashes := make([]chainhash.Hash, 0, len(p.prs)) + for hash := range p.prs { + hashes = append(hashes, hash) + } + p.mtx.RUnlock() + + return hashes +} + +// Message searches the mixing pool for a message by its hash. +func (p *Pool) Message(query *chainhash.Hash) (mixing.Message, error) { + p.mtx.RLock() + pr := p.prs[*query] + e, ok := p.pool[*query] + p.mtx.RUnlock() + if pr != nil { + return pr, nil + } + if !ok || e.msg == nil { + return nil, fmt.Errorf("message not found") + } + return e.msg, nil +} + +// HaveMessage checks whether the mixing pool contains a message by its hash. +func (p *Pool) HaveMessage(query *chainhash.Hash) bool { + p.mtx.RLock() + _, ok := p.pool[*query] + if !ok { + _, ok = p.prs[*query] + } + p.mtx.RUnlock() + return ok +} + +// MixPR searches the mixing pool for a PR message by its hash. +func (p *Pool) MixPR(query *chainhash.Hash) (*wire.MsgMixPairReq, error) { + var pr *wire.MsgMixPairReq + + p.mtx.RLock() + pr = p.prs[*query] + p.mtx.RUnlock() + + if pr == nil { + return nil, fmt.Errorf("PR message not found") + } + + return pr, nil +} + +// MixPRs returns all MixPR messages with hashes matching the query. Unknown +// messages are ignored. +// +// If query is nil, all PRs are returned. +// +// In both cases, any expired PRs that are still internally tracked by the +// mixpool for ongoing sessions are excluded from the result set. +func (p *Pool) MixPRs(query []chainhash.Hash) []*wire.MsgMixPairReq { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeConfirmedRuns() + + if query == nil { + res := make([]*wire.MsgMixPairReq, 0, len(p.prs)) + for _, pr := range p.prs { + // Exclude expired but not yet removed PRs. + if pr.Expiry <= p.expireHeight { + continue + } + + res = append(res, pr) + } + return res + } + + res := make([]*wire.MsgMixPairReq, 0, len(query)) + for i := range query { + pr, ok := p.prs[query[i]] + if ok { + // Exclude expired but not yet removed PRs. + if pr.Expiry <= p.expireHeight { + continue + } + + res = append(res, pr) + } + } + + return res +} + +// CompatiblePRs returns all MixPR messages with pairing descriptions matching +// the parameter. The order is unspecified. +func (p *Pool) CompatiblePRs(pairing []byte) []*wire.MsgMixPairReq { + p.mtx.RLock() + defer p.mtx.RUnlock() + + res := make([]*wire.MsgMixPairReq, 0, len(p.prs)) + for _, pr := range p.prs { + prPairing, _ := pr.Pairing() + if bytes.Equal(pairing, prPairing) { + res = append(res, pr) + } + } + + // Sort by decreasing expiries and remove any PRs double spending an + // output with an earlier expiry. + sort.Slice(res, func(i, j int) bool { + return res[i].Expiry >= res[j].Expiry + }) + seen := make(map[wire.OutPoint]uint32) + for i, pr := range res { + for _, utxo := range pr.UTXOs { + prevExpiry, ok := seen[utxo.OutPoint] + if !ok { + seen[utxo.OutPoint] = pr.Expiry + } else if pr.Expiry < prevExpiry { + res[i] = nil + } + } + } + filtered := res[:0] + for i := range res { + if res[i] != nil { + filtered = append(filtered, res[i]) + } + } + + // Sort again lexicographically by hash. + sort.Slice(filtered, func(i, j int) bool { + a := filtered[i].Hash() + b := filtered[j].Hash() + return bytes.Compare(a[:], b[:]) < 1 + }) + return filtered +} + +// ExpireMessagesInBackground will, after the current epoch period ends, +// remove all pair requests that indicate an expiry at or before the height +// parameter and removes all messages that chain back to a removed pair +// request. +// +// If a previous call is still waiting in the background to remove messages, +// this method has no effect, and proper usage to avoid a mixpool memory leak +// requires it to be consistently called as more blocks are processed. +func (p *Pool) ExpireMessagesInBackground(height uint32) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if p.expireHeight == 0 { + p.expireHeight = height + } + + select { + case p.expireSem <- struct{}{}: + go p.expireMessages() + default: + } +} + +// waitForExpiry blocks for at least one full epoch, waiting until two epoch +// ticks from now. +func (p *Pool) waitForExpiry() { + now := time.Now().UTC() + epoch := now.Truncate(p.epoch).Add(2 * p.epoch) + duration := epoch.Sub(now) + time.Sleep(duration) +} + +func (p *Pool) expireMessages() { + p.waitForExpiry() + + p.mtx.Lock() + defer func() { + <-p.expireSem + p.mtx.Unlock() + }() + + height := p.expireHeight + p.expireHeight = 0 + + p.expireMessagesNow(height) +} + +// ExpireMessages immediately expires all pair requests and sessions built +// from them that indicate expiry at or after a block height. +func (p *Pool) ExpireMessages(height uint32) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.expireMessagesNow(height) + p.expireHeight = 0 +} + +func (p *Pool) expireMessagesNow(height uint32) { + // Expire sessions and their messages + for sid, ses := range p.sessions { + if ses.expiry > height { + continue + } + + delete(p.sessions, sid) + for _, r := range ses.runs { + for hash := range r.hashes { + delete(p.pool, hash) + } + } + } + + // Expire PRs and remove identity tracking + for _, pr := range p.prs { + if pr.Expiry > height { + continue + } + + p.removePR(pr, "expired") + } + + // Expire orphans with old receive times, and in the case of any + // orphan KE, expire those with old epochs. + for hash, o := range p.orphans { + expire := time.Since(o.accepted) >= 20*time.Minute + if !expire { + if ke, ok := o.message.(*wire.MsgMixKeyExchange); ok { + epoch := time.Unix(int64(ke.Epoch), 0) + expire = time.Since(epoch) >= 20*time.Minute + } + } + if expire { + delete(p.orphans, hash) + delete(p.orphansByID, *(*idPubKey)(o.message.Pub())) + } + } +} + +// RemoveMessage removes a message that was rejected by the network. +func (p *Pool) RemoveMessage(msg mixing.Message) { + p.mtx.Lock() + defer p.mtx.Unlock() + + msgHash := msg.Hash() + delete(p.pool, msgHash) + if pr, ok := msg.(*wire.MsgMixPairReq); ok { + p.removePR(pr, "rejected") + } + if ke, ok := msg.(*wire.MsgMixKeyExchange); ok { + delete(p.latestKE, ke.Identity) + } +} + +// RemoveSession removes all non-PR messages from a completed or errored +// session. PR messages of a successful run (or rerun) are also removed. +func (p *Pool) RemoveSession(sid [32]byte, success bool) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeSession(sid, nil, success) +} + +func (p *Pool) removeSession(sid [32]byte, txHash *chainhash.Hash, success bool) { + ses := p.sessions[sid] + if ses == nil { + return + } + + // Delete PRs used to form final run + var removePRs []chainhash.Hash + var lastRun *runstate + if success { + lastRun = &ses.runs[len(ses.runs)-1] + removePRs = lastRun.prs + } + + if txHash != nil || success { + if txHash == nil { + // XXX: may be better to store this in the runstate as + // a CM is received. + for h := range lastRun.hashes { + if e, ok := p.pool[h]; ok && e.msgtype == msgtypeCM { + cm := e.msg.(*wire.MsgMixConfirm) + hash := cm.Mix.TxHash() + txHash = &hash + break + } + } + } + if txHash != nil { + delete(p.sessionsByTxHash, *txHash) + } + } + + delete(p.sessions, sid) + for _, r := range ses.runs { + for hash := range r.hashes { + delete(p.pool, hash) + } + } + + for _, prHash := range removePRs { + delete(p.pool, prHash) + if pr := p.prs[prHash]; pr != nil { + p.removePR(pr, "mixed") + } + } +} + +// RemoveConfirmedRuns removes all messages including pair requests from +// runs which ended in each peer sending a confirm mix message. +func (p *Pool) RemoveConfirmedRuns() { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeConfirmedRuns() +} + +func (p *Pool) removeConfirmedRuns() { + for sid, ses := range p.sessions { + lastRun := &ses.runs[len(ses.runs)-1] + cmCount := lastRun.countFor(msgtypeCM) + if uint32(len(lastRun.prs)) != cmCount { + continue + } + + delete(p.sessions, sid) + for _, run := range ses.runs { + for hash := range run.hashes { + delete(p.pool, hash) + } + } + + for _, hash := range lastRun.prs { + delete(p.pool, hash) + pr := p.prs[hash] + if pr != nil { + p.removePR(pr, "confirmed") + } + } + } +} + +// RemoveConfirmedMixes removes sessions and messages belonging to a completed +// session that resulted in published or mined transactions. Transaction +// hashes not associated with a session are ignored. PRs from the successful +// mix run are removed from the pool. +func (p *Pool) RemoveConfirmedMixes(txHashes []chainhash.Hash) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for i := range txHashes { + hash := &txHashes[i] + ses := p.sessionsByTxHash[*hash] + if ses == nil { + continue + } + + p.removeSession(ses.sid, hash, true) + } +} + +// RemoveSpentPRs removes all pair requests that are spent by any transaction +// input. +func (p *Pool) RemoveSpentPRs(txs []*wire.MsgTx) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, tx := range txs { + txHash := tx.TxHash() + ses, ok := p.sessionsByTxHash[txHash] + if ok { + p.removeSession(ses.sid, &txHash, true) + continue + } + + for _, in := range tx.TxIn { + prHash := p.outPoints[in.PreviousOutPoint] + pr, ok := p.prs[prHash] + if ok { + p.removePR(pr, "double spent") + } + } + } +} + +// ReceiveKEsByPairing returns the most recently received run-0 KE messages by +// a peer that reference PRs of a particular pairing and epoch. +func (p *Pool) ReceiveKEsByPairing(pairing []byte, epoch uint64) []*wire.MsgMixKeyExchange { + p.mtx.RLock() + defer p.mtx.RUnlock() + + var kes []*wire.MsgMixKeyExchange + for id, ke := range p.latestKE { + if ke.Epoch != epoch { + continue + } + prHash := p.messagesByIdentity[id][0] + pr := p.prs[prHash] + prPairing, err := pr.Pairing() + if err != nil { + continue + } + if bytes.Equal(pairing, prPairing) { + kes = append(kes, ke) + } + } + return kes +} + +// RemoveUnresponsiveDuringEpoch removes pair requests of unresponsive peers +// that did not provide any key exchange messages during the epoch in which a +// mix occurred. +func (p *Pool) RemoveUnresponsiveDuringEpoch(prs []*wire.MsgMixPairReq, epoch uint64) { + p.mtx.Lock() + defer p.mtx.Unlock() + +PRLoop: + for _, pr := range prs { + for _, msgHash := range p.messagesByIdentity[pr.Identity] { + msg, ok := p.pool[msgHash].msg.(*wire.MsgMixKeyExchange) + if !ok { + continue + } + if msg.Epoch == epoch { + continue PRLoop + } + } + + p.removePR(pr, "unresponsive") + } +} + +// Received is a parameter for Pool.Receive describing the session and run to +// receive messages for, and slices for returning results. A single non-nil +// slice is required and indicates which message slice will be will be +// appended to. Received messages are unsorted. +type Received struct { + Sid [32]byte + Run uint32 + KEs []*wire.MsgMixKeyExchange + CTs []*wire.MsgMixCiphertexts + SRs []*wire.MsgMixSlotReserve + DCs []*wire.MsgMixDCNet + CMs []*wire.MsgMixConfirm + FPs []*wire.MsgMixFactoredPoly + RSs []*wire.MsgMixSecrets +} + +// Receive returns messages matching a session, run, and message type, waiting +// until all described messages have been received, or earlier with the +// messages received so far if the context is cancelled before this point. +// +// Receive only returns results for the session ID and run increment in the r +// parameter. If no such session or run has any messages currently accepted +// in the mixpool, the method immediately errors. +// +// If any secrets messages are received for the described session and run, and +// r.RSs is nil, Receive immediately returns ErrSecretsRevealed. An +// additional call to Receive with a non-nil RSs can be used to receive all of +// the secrets after each peer publishes their own revealed secrets. +func (p *Pool) Receive(ctx context.Context, expectedMessages int, r *Received) error { + sid := r.Sid + run := r.Run + var bc *broadcast + var rs *runstate + + p.mtx.RLock() + ses, ok := p.sessions[sid] + if !ok { + p.mtx.RUnlock() + return fmt.Errorf("unknown session %x", sid[:]) + } + bc = &ses.bc + if run >= uint32(len(ses.runs)) { + p.mtx.RUnlock() + return fmt.Errorf("unknown run %d", run) + } + rs = &ses.runs[run] + + nonNilSlices := 0 + if r.KEs != nil { + nonNilSlices++ + } + if r.CTs != nil { + nonNilSlices++ + } + if r.SRs != nil { + nonNilSlices++ + } + if r.DCs != nil { + nonNilSlices++ + } + if r.CMs != nil { + nonNilSlices++ + } + if r.FPs != nil { + nonNilSlices++ + } + if r.RSs != nil { + nonNilSlices++ + } + if nonNilSlices != 1 { + return fmt.Errorf("mixpool: exactly one Received slice must be non-nil") + } + +Loop: + for { + // Pool is locked for reads. Count if the total number of + // expected messages have been received. + received := 0 + for hash := range rs.hashes { + msgtype := p.pool[hash].msgtype + switch { + case msgtype == msgtypeKE && r.KEs != nil: + received++ + case msgtype == msgtypeCT && r.CTs != nil: + received++ + case msgtype == msgtypeSR && r.SRs != nil: + received++ + case msgtype == msgtypeDC && r.DCs != nil: + received++ + case msgtype == msgtypeCM && r.CMs != nil: + received++ + case msgtype == msgtypeFP && r.FPs != nil: + received++ + case msgtype == msgtypeRS: + if r.RSs == nil { + // Since initial reporters of secrets + // need to take the blame for + // erroneous blame assignment if no + // issue was detected, we only trigger + // this for RS messages that do not + // reference any other previous RS. + rs := p.pool[hash].msg.(*wire.MsgMixSecrets) + prev := rs.PrevMsgs() + if len(prev) == 0 { + p.mtx.RUnlock() + return ErrSecretsRevealed + } + } else { + received++ + } + } + } + if received >= expectedMessages { + break + } + + // Unlock while waiting for the broadcast channel. + p.mtx.RUnlock() + + select { + case <-ctx.Done(): + p.mtx.RLock() + break Loop + case <-bc.wait(): + } + + p.mtx.RLock() + } + + // Pool is locked for reads. Collect all of the messages. + for hash := range rs.hashes { + msg := p.pool[hash].msg + switch msg := msg.(type) { + case *wire.MsgMixKeyExchange: + if r.KEs != nil { + r.KEs = append(r.KEs, msg) + } + case *wire.MsgMixCiphertexts: + if r.CTs != nil { + r.CTs = append(r.CTs, msg) + } + case *wire.MsgMixSlotReserve: + if r.SRs != nil { + r.SRs = append(r.SRs, msg) + } + case *wire.MsgMixDCNet: + if r.DCs != nil { + r.DCs = append(r.DCs, msg) + } + case *wire.MsgMixConfirm: + if r.CMs != nil { + r.CMs = append(r.CMs, msg) + } + case *wire.MsgMixFactoredPoly: + if r.FPs != nil { + r.FPs = append(r.FPs, msg) + } + case *wire.MsgMixSecrets: + if r.RSs != nil { + r.RSs = append(r.RSs, msg) + } + } + } + + p.mtx.RUnlock() + return nil +} + +var zeroHash chainhash.Hash + +// AcceptMessage accepts a mixing message to the pool. +// +// Messages must contain the mixing participant's identity and contain a valid +// signature committing to all non-signature fields. +// +// PR messages will not be accepted if they reference an unknown UTXO or if not +// enough fee is contributed. Any other message will not be accepted if it +// references previous messages that are not recorded by the pool. +// +// All newly accepted messages, including any orphan key exchange messages +// that were processed after processing missing pair requests, are returned. +func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err error) { + defer func() { + if err == nil { + log.Tracef("AcceptMessage: accepted message %T %v", msg, msg.Hash()) + } else { + log.Tracef("AcceptMessage: rejected message %T %v: %v", msg, msg.Hash(), err) + } + }() + + hash := msg.Hash() + if hash == zeroHash { + return nil, fmt.Errorf("message of type %T has not been hashed", msg) + } + + alreadyAccepted := func() bool { + _, ok := p.pool[hash] + if !ok { + _, ok = p.prs[hash] + } + return ok + } + + // Check if already accepted. + p.mtx.RLock() + ok := alreadyAccepted() + p.mtx.RUnlock() + if ok { + return nil, nil + } + + // Require message to be signed by the presented identity. + if !mixing.VerifySignedMessage(msg) { + return nil, ruleError(ErrInvalidSignature) + } + id := (*idPubKey)(msg.Pub()) + + var msgtype msgtype + switch msg := msg.(type) { + case *wire.MsgMixPairReq: + accepted, err := p.acceptPR(msg, &hash, id) + if err != nil { + return nil, err + } + // Avoid returning a non-nil mixing.Message in return + // variable with a nil PR. + if accepted == nil { + return nil, nil + } + + allAccepted := p.reconsiderOrphans(msg, id) + return allAccepted, nil + + case *wire.MsgMixKeyExchange: + accepted, err := p.acceptKE(msg, &hash, id) + if err != nil { + return nil, err + } + // Avoid returning a non-nil mixing.Message in return + // variable with a nil KE. + if accepted == nil { + return nil, nil + } + allAccepted := p.reconsiderOrphans(msg, id) + return allAccepted, nil + + case *wire.MsgMixCiphertexts: + msgtype = msgtypeCT + case *wire.MsgMixSlotReserve: + msgtype = msgtypeSR + case *wire.MsgMixDCNet: + msgtype = msgtypeDC + case *wire.MsgMixConfirm: + msgtype = msgtypeCM + case *wire.MsgMixFactoredPoly: + msgtype = msgtypeFP + case *wire.MsgMixSecrets: + msgtype = msgtypeRS + default: + return nil, fmt.Errorf("unknown mix message type %T", msg) + } + + if len(msg.Sid()) != 32 { + return nil, ruleError(ErrInvalidSessionID) + } + sid := *(*[32]byte)(msg.Sid()) + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Read lock was given up to acquire write lock. Check if already + // accepted. + if alreadyAccepted() { + return nil, nil + } + + // Check that a message from this identity does not reuse a run number + // for the session. + var haveKE bool + for _, prevHash := range p.messagesByIdentity[*id] { + e := p.pool[prevHash] + run := msg.GetRun() + if e.msgtype == msgtype && e.msg.GetRun() == run && + bytes.Equal(e.msg.Sid(), msg.Sid()) { + return nil, fmt.Errorf("message %v by identity %x "+ + "reuses run number %d in session %x, "+ + "conflicting with already accepted message %v", + hash, *id, run, msg.Sid(), prevHash) + } + if e.msgtype == msgtypeKE && e.msg.GetRun() == run && + bytes.Equal(e.msg.Sid(), msg.Sid()) { + haveKE = true + } + } + // Save as an orphan if their KE is not (yet) known. + if !haveKE { + orphansByID := p.orphansByID[*id] + if _, ok := orphansByID[hash]; ok { + // Already an orphan. + return nil, nil + } + if orphansByID == nil { + orphansByID = make(map[chainhash.Hash]mixing.Message) + p.orphansByID[*id] = orphansByID + } + p.orphans[hash] = &orphan{ + message: msg, + accepted: time.Now(), + } + orphansByID[hash] = msg + + // TODO: Consider return an error containing the unknown + // messages, so they can be getdata'd. + return nil, nil + } + + ses := p.sessions[sid] + if ses == nil { + return nil, fmt.Errorf("%s %s belongs to unknown session %x", + msgtype, &hash, &sid) + } + + err = p.acceptEntry(msg, msgtype, &hash, id, ses) + if err != nil { + return nil, err + } + return []mixing.Message{msg}, nil +} + +// removePR removes a pair request message and all other messages and sessions +// that the peer sent and was involved in. +func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) { + prHash := pr.Hash() + + log.Debugf("Removing %s PR %s by %x", reason, prHash, pr.Identity[:]) + + delete(p.prs, prHash) + for _, hash := range p.messagesByIdentity[pr.Identity] { + e, ok := p.pool[hash] + if !ok { + continue + } + ke, ok := e.msg.(*wire.MsgMixKeyExchange) + if ok { + p.removeSession(ke.SessionID, nil, false) + } + delete(p.pool, hash) + } + delete(p.messagesByIdentity, pr.Identity) + delete(p.latestKE, pr.Identity) + for orphanHash := range p.orphansByID[pr.Identity] { + delete(p.orphans, orphanHash) + } + delete(p.orphansByID, pr.Identity) + for i := range pr.UTXOs { + delete(p.outPoints, pr.UTXOs[i].OutPoint) + } +} + +func (p *Pool) acceptPR(pr *wire.MsgMixPairReq, hash *chainhash.Hash, id *idPubKey) (accepted *wire.MsgMixPairReq, err error) { + switch { + case len(pr.UTXOs) == 0: // Require at least one utxo. + return nil, ruleError(ErrMissingUTXOs) + case pr.MessageCount == 0: // Require at least one mixed message. + return nil, ruleError(ErrInvalidMessageCount) + case pr.InputValue < int64(pr.MessageCount)*pr.MixAmount: + return nil, ruleError(ErrInvalidTotalMixAmount) + case pr.Change != nil: + if isDustAmount(pr.Change.Value, p2pkhv0PkScriptSize, feeRate) { + return nil, ruleError(ErrChangeDust) + } + if !stdscript.IsPubKeyHashScriptV0(pr.Change.PkScript) && + !stdscript.IsScriptHashScriptV0(pr.Change.PkScript) { + return nil, ruleError(ErrInvalidScript) + } + } + + // Check that expiry has not been reached, nor that it is too far + // into the future. This limits replay attacks. + _, curHeight := p.blockchain.CurrentTip() + maxExpiry := mixing.MaxExpiry(uint32(curHeight), p.params) + switch { + case uint32(curHeight) >= pr.Expiry: + return nil, fmt.Errorf("message has expired") + case pr.Expiry > maxExpiry: + return nil, fmt.Errorf("expiry is too far into future") + } + + // Require known script classes. + switch mixing.ScriptClass(pr.ScriptClass) { + case mixing.ScriptClassP2PKHv0: + default: + return nil, fmt.Errorf("unsupported mixing script class") + } + + // Require enough fee contributed from this mixing participant. + // Size estimation assumes mixing.ScriptClassP2PKHv0 outputs and inputs. + err = checkFee(pr, p.feeRate) + if err != nil { + return nil, err + } + + // If able, sanity check UTXOs. + if p.utxoFetcher != nil { + err := p.checkUTXOs(pr, curHeight) + if err != nil { + return nil, err + } + } + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Check if already accepted. + if _, ok := p.prs[*hash]; ok { + return nil, nil + } + + // Discourage identity reuse. PRs should be the first message sent by + // this identity, and there should only be one PR per identity. + if len(p.messagesByIdentity[*id]) != 0 { + return nil, fmt.Errorf("identity reused for a PR message") + } + + // Only accept PRs that double spend outpoints if they expire later + // than existing PRs. Otherwise, reject this PR message. + for i := range pr.UTXOs { + otherPRHash := p.outPoints[pr.UTXOs[i].OutPoint] + otherPR, ok := p.prs[otherPRHash] + if !ok { + continue + } + if otherPR.Expiry >= pr.Expiry { + err := fmt.Errorf("PR double spends outpoints of " + + "already-accepted PR message without " + + "increasing expiry") + return nil, err + } + } + + // Accept the PR + p.prs[*hash] = pr + for i := range pr.UTXOs { + p.outPoints[pr.UTXOs[i].OutPoint] = *hash + } + p.messagesByIdentity[*id] = append(make([]chainhash.Hash, 0, 16), *hash) + + return pr, nil +} + +// reconsiderOrphans reconsiders any messages that are currently saved as +// orphans due to missing previous PR message (in the case of KE orphans) or +// missing the identity's KE in matching session and run (for all other +// messages). The function is recursive: if a reconsidered orphan KE is +// accepted, other orphans by the identity will be considered as well. +func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing.Message { + acceptedMessages := []mixing.Message{accepted} + + var kes []*wire.MsgMixKeyExchange + if ke, ok := accepted.(*wire.MsgMixKeyExchange); ok { + kes = append(kes, ke) + } + + // If the accepted message was a PR, there may be KE orphans that can + // be accepted now. + if pr, ok := accepted.(*wire.MsgMixPairReq); ok { + // Orphan KEs must be accepted in run order. Find all + // possible matching ones and sort by run. + var orphanKEs []*wire.MsgMixKeyExchange + for _, orphan := range p.orphansByID[*id] { + orphanKE, ok := orphan.(*wire.MsgMixKeyExchange) + if !ok { + continue + } + refsAcceptedPR := false + for _, prHash := range orphanKE.SeenPRs { + if pr.Hash() == prHash { + refsAcceptedPR = true + break + } + } + if !refsAcceptedPR { + continue + } + + orphanKEs = append(orphanKEs, orphanKE) + } + sort.Slice(orphanKEs, func(i, j int) bool { + return orphanKEs[i].Run < orphanKEs[j].Run + }) + + for _, orphanKE := range orphanKEs { + orphanKEHash := orphanKE.Hash() + _, err := p.acceptKE(orphanKE, &orphanKEHash, &orphanKE.Identity) + if err != nil { + log.Debugf("orphan KE could not be accepted: %v", err) + continue + } + + kes = append(kes, orphanKE) + delete(p.orphansByID[*id], orphanKEHash) + delete(p.orphans, orphanKEHash) + + acceptedMessages = append(acceptedMessages, orphanKE) + } + if len(p.orphansByID[*id]) == 0 { + delete(p.orphansByID, *id) + return acceptedMessages + } + } + + // For any KE that has been accepted following reconsideration after + // accepting a PR, other orphan messages may be potentially accepted + // as well. + for _, ke := range kes { + ses := p.sessions[ke.SessionID] + if ses == nil { + log.Errorf("No session %x exists for accepted KE %s", + ke.SessionID[:], ke.Hash()) + continue + } + + var acceptedOrphans []mixing.Message + for orphanHash, orphan := range p.orphansByID[*id] { + if !bytes.Equal(orphan.Sid(), ke.SessionID[:]) { + continue + } + if orphan.GetRun() != ke.Run { + continue + } + + var msgtype msgtype + switch orphan.(type) { + case *wire.MsgMixCiphertexts: + msgtype = msgtypeCT + case *wire.MsgMixSlotReserve: + msgtype = msgtypeSR + case *wire.MsgMixDCNet: + msgtype = msgtypeDC + case *wire.MsgMixConfirm: + msgtype = msgtypeCM + case *wire.MsgMixFactoredPoly: + msgtype = msgtypeFP + case *wire.MsgMixSecrets: + msgtype = msgtypeRS + default: + log.Errorf("Unknown orphan message %T %s", orphan, orphan.Hash()) + continue + } + + err := p.acceptEntry(orphan, msgtype, &orphanHash, id, ses) + if err != nil { + log.Debugf("Orphan %v by identity %x could not be "+ + "processed after accepting KE %v", + orphanHash, id[:], ke.Hash()) + continue + } + + acceptedOrphans = append(acceptedOrphans, orphan) + acceptedMessages = append(acceptedMessages, orphan) + } + for _, orphan := range acceptedOrphans { + orphanHash := orphan.Hash() + delete(p.orphansByID[*id], orphanHash) + delete(p.orphans, orphanHash) + } + if len(p.orphansByID[*id]) == 0 { + delete(p.orphansByID, *id) + return acceptedMessages + } + } + + return acceptedMessages +} + +// Check that UTXOs exist, have confirmations, sum of UTXO values matches the +// input value, and proof of ownership is valid. +func (p *Pool) checkUTXOs(pr *wire.MsgMixPairReq, curHeight int64) error { + var totalValue int64 + + for i := range pr.UTXOs { + utxo := &pr.UTXOs[i] + entry, err := p.utxoFetcher.FetchUtxoEntry(utxo.OutPoint) + if err != nil { + return err + } + if entry == nil || entry.IsSpent() { + return fmt.Errorf("output %v is not unspent", + &utxo.OutPoint) + } + height := entry.BlockHeight() + if !confirmed(minconf, height, curHeight) { + return fmt.Errorf("output %v is unconfirmed", + &utxo.OutPoint) + } + if entry.ScriptVersion() != 0 { + return fmt.Errorf("output %v does not use script version 0", + &utxo.OutPoint) + } + + // Check proof of key ownership and ability to sign coinjoin + // inputs. + var extractPubKeyHash160 func([]byte) []byte + switch { + case utxo.Opcode == 0: + extractPubKeyHash160 = stdscript.ExtractPubKeyHashV0 + case utxo.Opcode == txscript.OP_SSGEN: + extractPubKeyHash160 = stdscript.ExtractStakeGenPubKeyHashV0 + case utxo.Opcode == txscript.OP_SSRTX: + extractPubKeyHash160 = stdscript.ExtractStakeRevocationPubKeyHashV0 + case utxo.Opcode == txscript.OP_TGEN: + extractPubKeyHash160 = stdscript.ExtractTreasuryGenPubKeyHashV0 + default: + return fmt.Errorf("unsupported output script for UTXO %s", &utxo.OutPoint) + } + valid := validateOwnerProofP2PKHv0(extractPubKeyHash160, + entry.PkScript(), utxo.PubKey, utxo.Signature, pr.Expires()) + if !valid { + return ruleError(ErrInvalidUTXOProof) + } + + totalValue += entry.Amount() + } + + if totalValue != pr.InputValue { + return fmt.Errorf("input value does not match sum of UTXO " + + "values") + } + + return nil +} + +func validateOwnerProofP2PKHv0(extractFunc func([]byte) []byte, pkscript, pubkey, sig []byte, expires uint32) bool { + extractedHash160 := extractFunc(pkscript) + pubkeyHash160 := dcrutil.Hash160(pubkey) + if !bytes.Equal(extractedHash160, pubkeyHash160) { + return false + } + + return utxoproof.ValidateSecp256k1P2PKH(pubkey, sig, expires) +} + +func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *idPubKey) (accepted *wire.MsgMixKeyExchange, err error) { + // Validate PR order and session ID. + err = mixing.ValidateSession(ke) + if err != nil { + return nil, ruleError(err) + } + + if ke.Pos >= uint32(len(ke.SeenPRs)) { + err := fmt.Errorf("peer position is an invalid seen PRs position") + return nil, ruleError(err) + } + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Check if already accepted. + if _, ok := p.pool[*hash]; ok { + return nil, nil + } + + // While KEs are allowed to reference unknown PRs, they must at least + // reference the PR submitted by their own identity. If not, the KE + // is saved as an orphan and may be processed later. + // Of all PRs that are known, their pairing types must be compatible. + var missingOwnPR *chainhash.Hash + prs := make([]*wire.MsgMixPairReq, 0, len(ke.SeenPRs)) + var pairing []byte + for i := range ke.SeenPRs { + seenPR := &ke.SeenPRs[i] + pr, ok := p.prs[*seenPR] + if !ok { + if uint32(i) == ke.Pos { + missingOwnPR = seenPR + } + continue + } + if uint32(i) == ke.Pos && pr.Identity != ke.Identity { + err := fmt.Errorf("KE identity does not match own PR " + + "at unmixed position") + return nil, ruleError(err) + } + if pairing == nil { + var err error + pairing, err = pr.Pairing() + if err != nil { + return nil, err + } + } else { + pairing2, err := pr.Pairing() + if err != nil { + return nil, err + } + if !bytes.Equal(pairing, pairing2) { + err := fmt.Errorf("referenced PRs are incompatible") + return nil, err + } + } + } + if missingOwnPR != nil { + p.orphans[*hash] = &orphan{ + message: ke, + accepted: time.Now(), + } + orphansByID := p.orphansByID[*id] + if orphansByID == nil { + orphansByID = make(map[chainhash.Hash]mixing.Message) + p.orphansByID[*id] = orphansByID + } + orphansByID[*hash] = ke + err := &MissingOwnPRError{ + MissingPR: *missingOwnPR, + } + return nil, err + } + + sid := ke.SessionID + ses := p.sessions[sid] + + // Create a session for the first run-0 KE + if ses == nil { + if ke.Run != 0 { + err := fmt.Errorf("unknown session for run-%d KE", + ke.Run) + return nil, err + } + + expiry := ^uint32(0) + for i := range prs { + prExpiry := prs[i].Expires() + if expiry > prExpiry { + expiry = prExpiry + } + } + ses = &session{ + sid: sid, + runs: make([]runstate, 0, 4), + expiry: expiry, + bc: broadcast{ch: make(chan struct{})}, + } + p.sessions[sid] = ses + } + + err = p.acceptEntry(ke, msgtypeKE, hash, id, ses) + if err != nil { + return nil, err + } + p.latestKE[*id] = ke + return ke, nil +} + +func (p *Pool) acceptEntry(msg mixing.Message, msgtype msgtype, hash *chainhash.Hash, + id *[33]byte, ses *session) error { + + run := msg.GetRun() + if run > uint32(len(ses.runs)) { + return fmt.Errorf("message skips runs") + } + + var rs *runstate + if msgtype == msgtypeKE && run == uint32(len(ses.runs)) { + // Add a runstate for the next run. + ses.runs = append(ses.runs, runstate{ + prs: msg.PrevMsgs(), + hashes: make(map[chainhash.Hash]struct{}), + }) + rs = &ses.runs[len(ses.runs)-1] + } else { + // Add to existing runstate + rs = &ses.runs[run] + } + + rs.hashes[*hash] = struct{}{} + e := entry{ + hash: *hash, + sid: ses.sid, + recvTime: time.Now(), + msg: msg, + msgtype: msgtype, + run: run, + } + p.pool[*hash] = e + p.messagesByIdentity[*id] = append(p.messagesByIdentity[*id], *hash) + + if cm, ok := msg.(*wire.MsgMixConfirm); ok { + p.sessionsByTxHash[cm.Mix.TxHash()] = ses + } + + rs.incrementCountFor(msgtype) + ses.bc.signal() + + return nil +} + +func confirmed(minConf, txHeight, curHeight int64) bool { + return confirms(txHeight, curHeight) >= minConf +} + +func confirms(txHeight, curHeight int64) int64 { + switch { + case txHeight == -1, txHeight > curHeight: + return 0 + default: + return curHeight - txHeight + 1 + } +} + +// isDustAmount determines whether a transaction output value and script length would +// cause the output to be considered dust. Transactions with dust outputs are +// not standard and are rejected by mempools with default policies. +func isDustAmount(amount int64, scriptSize int, relayFeePerKb int64) bool { + // Calculate the total (estimated) cost to the network. This is + // calculated using the serialize size of the output plus the serial + // size of a transaction input which redeems it. The output is assumed + // to be compressed P2PKH as this is the most common script type. Use + // the average size of a compressed P2PKH redeem input (165) rather than + // the largest possible (txsizes.RedeemP2PKHInputSize). + totalSize := 8 + 2 + wire.VarIntSerializeSize(uint64(scriptSize)) + + scriptSize + 165 + + // Dust is defined as an output value where the total cost to the network + // (output size + input size) is greater than 1/3 of the relay fee. + return amount*1000/(3*int64(totalSize)) < relayFeePerKb +} + +func checkFee(pr *wire.MsgMixPairReq, feeRate int64) error { + fee := pr.InputValue - int64(pr.MessageCount)*pr.MixAmount + if pr.Change != nil { + fee -= pr.Change.Value + } + + estimatedSize := estimateP2PKHv0SerializeSize(len(pr.UTXOs), + int(pr.MessageCount), pr.Change != nil) + requiredFee := feeForSerializeSize(feeRate, estimatedSize) + if fee < requiredFee { + return fmt.Errorf("not enough input value, or too low fee") + } + + return nil +} + +func feeForSerializeSize(relayFeePerKb int64, txSerializeSize int) int64 { + fee := relayFeePerKb * int64(txSerializeSize) / 1000 + + if fee == 0 && relayFeePerKb > 0 { + fee = relayFeePerKb + } + + const maxAmount = 21e6 * 1e8 + if fee < 0 || fee > maxAmount { + fee = maxAmount + } + + return fee +} + +const ( + redeemP2PKHv0SigScriptSize = 1 + 73 + 1 + 33 + p2pkhv0PkScriptSize = 1 + 1 + 1 + 20 + 1 + 1 +) + +func estimateP2PKHv0SerializeSize(inputs, outputs int, hasChange bool) int { + // Sum the estimated sizes of the inputs and outputs. + txInsSize := inputs * estimateInputSize(redeemP2PKHv0SigScriptSize) + txOutsSize := outputs * estimateOutputSize(p2pkhv0PkScriptSize) + + changeSize := 0 + if hasChange { + changeSize = estimateOutputSize(p2pkhv0PkScriptSize) + outputs++ + } + + // 12 additional bytes are for version, locktime and expiry. + return 12 + (2 * wire.VarIntSerializeSize(uint64(inputs))) + + wire.VarIntSerializeSize(uint64(outputs)) + + txInsSize + txOutsSize + changeSize +} + +// estimateInputSize returns the worst case serialize size estimate for a tx input +func estimateInputSize(scriptSize int) int { + return 32 + // previous tx + 4 + // output index + 1 + // tree + 8 + // amount + 4 + // block height + 4 + // block index + wire.VarIntSerializeSize(uint64(scriptSize)) + // size of script + scriptSize + // script itself + 4 // sequence +} + +// estimateOutputSize returns the worst case serialize size estimate for a tx output +func estimateOutputSize(scriptSize int) int { + return 8 + // previous tx + 2 + // version + wire.VarIntSerializeSize(uint64(scriptSize)) + // size of script + scriptSize // script itself +} diff --git a/mixing/mixpool/mixpool_test.go b/mixing/mixpool/mixpool_test.go new file mode 100644 index 0000000000..8c8aba3900 --- /dev/null +++ b/mixing/mixpool/mixpool_test.go @@ -0,0 +1,408 @@ +// Copyright (c) 2023-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + cryptorand "crypto/rand" + "encoding/hex" + "errors" + "flag" + "fmt" + "io" + "math/big" + "os" + "testing" + + "decred.org/cspp/v2/solverrpc" + "github.com/davecgh/go-spew/spew" + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" + "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/internal/chacha20prng" + "github.com/decred/dcrd/mixing/utxoproof" + "github.com/decred/dcrd/txscript/v4" + "github.com/decred/dcrd/wire" +) + +var params = chaincfg.SimNetParams() + +var seed [32]byte + +func testPRNG(t *testing.T) *chacha20prng.Reader { + t.Logf("PRNG seed: %x\n", seed) + return chacha20prng.New(seed[:], 0) +} + +var utxoStore struct { + byName map[string]*mockUTXO + byOutpoint map[wire.OutPoint]UtxoEntry +} + +func TestMain(m *testing.M) { + seedFlag := flag.String("seed", "", "use deterministic PRNG seed (32 bytes, hex)") + flag.Parse() + if *seedFlag != "" { + b, err := hex.DecodeString(*seedFlag) + if err != nil { + fmt.Fprintln(os.Stderr, "invalid -seed:", err) + os.Exit(1) + } + if len(b) != 32 { + fmt.Fprintln(os.Stderr, "invalid -seed: must be 32 bytes") + os.Exit(1) + } + copy(seed[:], b) + } else { + cryptorand.Read(seed[:]) + } + + rand := chacha20prng.New(seed[:], 0) + + utxoStore.byName = makeMockUTXOs(rand) + utxoStore.byOutpoint = make(map[wire.OutPoint]UtxoEntry) + for _, m := range utxoStore.byName { + utxoStore.byOutpoint[m.outpoint] = m + } + + os.Exit(m.Run()) +} + +func generateSecp256k1(rand io.Reader) (*secp256k1.PublicKey, *secp256k1.PrivateKey, error) { + if rand == nil { + rand = cryptorand.Reader + } + + privateKey, err := secp256k1.GeneratePrivateKeyFromRand(rand) + if err != nil { + return nil, nil, err + } + + publicKey := privateKey.PubKey() + + return publicKey, privateKey, nil +} + +type mockUTXO struct { + outpoint wire.OutPoint + tx *wire.MsgTx + output *wire.TxOut + blockHeight int64 + pubkey []byte + privkey *secp256k1.PrivateKey +} + +func (m *mockUTXO) IsSpent() bool { return false } +func (m *mockUTXO) PkScript() []byte { return m.output.PkScript } +func (m *mockUTXO) ScriptVersion() uint16 { return 0 } +func (m *mockUTXO) BlockHeight() int64 { return m.blockHeight } +func (m *mockUTXO) Amount() int64 { return m.output.Value } + +func makeMockUTXO(rand io.Reader, value int64) *mockUTXO { + tx := wire.NewMsgTx() + + pub, priv, err := generateSecp256k1(rand) + if err != nil { + panic(err) + } + pubSerialized := pub.SerializeCompressed() + + p2pkhScript := []byte{ + 0: txscript.OP_DUP, + 1: txscript.OP_HASH160, + 2: txscript.OP_DATA_20, + 23: txscript.OP_EQUALVERIFY, + 24: txscript.OP_CHECKSIG, + } + hash160 := dcrutil.Hash160(pubSerialized) + copy(p2pkhScript[3:23], hash160) + + output := wire.NewTxOut(value, p2pkhScript) + tx.AddTxOut(output) + + return &mockUTXO{ + outpoint: wire.OutPoint{ + Hash: tx.TxHash(), + Index: 0, + Tree: 0, + }, + tx: tx, + output: output, + pubkey: pubSerialized, + privkey: priv, + } +} + +func (m *mockUTXO) mixprutxo(expires uint32) wire.MixPairReqUTXO { + k := utxoproof.Secp256k1KeyPair{ + Pub: m.pubkey, + Priv: m.privkey, + } + sig, err := k.SignUtxoProof(expires) + if err != nil { + panic(err) + } + + return wire.MixPairReqUTXO{ + OutPoint: m.outpoint, + PubKey: m.pubkey, + Signature: sig, + } +} + +func makeMockUTXOs(rand io.Reader) map[string]*mockUTXO { + return map[string]*mockUTXO{ + "A": makeMockUTXO(rand, 20e8), + } +} + +type fakechain struct { + hash chainhash.Hash + height int64 +} + +func (c *fakechain) ChainParams() *chaincfg.Params { + return params +} + +func (c *fakechain) CurrentTip() (chainhash.Hash, int64) { + return c.hash, c.height +} + +func (c *fakechain) FetchUtxoEntry(op wire.OutPoint) (UtxoEntry, error) { + entry := utxoStore.byOutpoint[op] + if entry == nil { + return nil, errors.New("no utxo entry") + } + return entry, nil +} + +func TestAccept(t *testing.T) { + t.Parallel() + testRand := testPRNG(t) + + c := new(fakechain) + c.height = 1000 + p := NewPool(c) + + identityPub, identityPriv, err := generateSecp256k1(testRand) + if err != nil { + t.Fatal(err) + } + identity := *(*[33]byte)(identityPub.SerializeCompressed()) + + h := blake256.New() + + var ( + expires uint32 = 1010 + mixAmount int64 = 10e8 + scriptClass = mixing.ScriptClassP2PKHv0 + txVersion uint16 = wire.TxVersion + lockTime uint32 = 0 + messageCount uint32 = 1 + inputValue int64 = 20e8 + utxos []wire.MixPairReqUTXO + change *wire.TxOut + flags byte = 0 + pairingFlags byte = 0 + ) + utxos = []wire.MixPairReqUTXO{ + utxoStore.byName["A"].mixprutxo(expires), + } + pr, err := wire.NewMsgMixPairReq(identity, expires, mixAmount, + string(scriptClass), txVersion, lockTime, messageCount, + inputValue, utxos, change, flags, pairingFlags) + if err != nil { + t.Fatal(err) + } + pr.WriteHash(h) + err = mixing.SignMessage(pr, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(pr) + if err != nil { + t.Fatal(err) + } + + prngSeed := testRand.Next(32) + runPRNG := chacha20prng.New(prngSeed, 0) + kx, err := mixing.NewKX(runPRNG) + if err != nil { + t.Fatal(err) + } + + // Generate unpadded SR and DC messages. + var msize uint32 = 20 + var mcount uint32 = 2 + srMsg := make([]*big.Int, mcount) + for i := range srMsg { + srMsg[i], err = cryptorand.Int(testRand, mixing.F) + if err != nil { + t.Fatal(err) + } + } + dcMsg := make([][]byte, mcount) + for i := range dcMsg { + dcMsg[i] = testRand.Next(int(msize)) + } + t.Logf("SR messages %+x\n", srMsg) + t.Logf("DC messages %+x\n", dcMsg) + + var ( + seenPRs = []chainhash.Hash{pr.Hash()} + epoch uint64 = 0 + sid [32]byte = mixing.SortPRsForSession([]*wire.MsgMixPairReq{pr}, epoch) + run uint32 = 0 + pos uint32 = 0 + ecdh [33]byte = *(*[33]byte)(kx.ECDHPublicKey.SerializeCompressed()) + pqpk [1218]byte = *kx.PQPublicKey + commitment [32]byte // XXX: hash of RS message + ) + ke := wire.NewMsgMixKeyExchange(identity, sid, epoch, run, pos, ecdh, + pqpk, commitment, seenPRs) + ke.WriteHash(h) + err = mixing.SignMessage(ke, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(ke) + if err != nil { + t.Fatal(err) + } + + pqPubkeys := []*mixing.PQPublicKey{kx.PQPublicKey} + ciphertexts, err := kx.Encapsulate(runPRNG, pqPubkeys, 0) + if err != nil { + t.Fatal(err) + } + + seenKEs := []chainhash.Hash{ke.Hash()} + ct := wire.NewMsgMixCiphertexts(identity, sid, run, ciphertexts, seenKEs) + ct.WriteHash(h) + err = mixing.SignMessage(ct, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(ct) + if err != nil { + t.Fatal(err) + } + + mcounts := []uint32{mcount} + revealedKeys := &mixing.RevealedKeys{ + ECDHPublicKeys: []*secp256k1.PublicKey{kx.ECDHPublicKey}, + Ciphertexts: ciphertexts, + MyIndex: 0, + } + secrets, err := kx.SharedSecrets(revealedKeys, sid[:], run, mcounts) + if err != nil { + t.Fatal(err) + } + + // Pad SR messages + srmix := make([][]*big.Int, mcount) + myStart := uint32(0) + for i := uint32(0); i < mcount; i++ { + pads := mixing.SRMixPads(secrets.SRSecrets[i], myStart+i) + padded := mixing.SRMix(srMsg[i], pads) + srmix[i] = make([]*big.Int, len(padded)) + copy(srmix[i], padded) + } + srmixBytes := make([][][]byte, len(srmix)) + for i := range srmix { + srmixBytes[i] = make([][]byte, len(srmix[i])) + for j := range srmix[i] { + srmixBytes[i][j] = srmix[i][j].Bytes() + } + } + t.Logf("SR mix %+x\n", srmix) + + seenCTs := []chainhash.Hash{ct.Hash()} + sr := wire.NewMsgMixSlotReserve(identity, sid, run, srmixBytes, seenCTs) + sr.WriteHash(h) + err = mixing.SignMessage(sr, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(sr) + if err != nil { + t.Fatal(err) + } + + vs := srmix + powerSums := mixing.AddVectors(vs...) + coeffs := mixing.Coefficients(powerSums) + if err := solverrpc.StartSolver(); err != nil { + t.Skipf("Cannot continue test; unable to start solver: %v", err) + } + roots, err := solverrpc.Roots(coeffs, mixing.F) + if err != nil { + t.Fatal(err) + } + t.Logf("solved roots %+x\n", roots) + + // Pad DC messages + dcmix := make([]wire.MixVect, mcount) + var slots []int + for i := 0; i < int(mcount); i++ { + slots = append(slots, i) + } + for i, slot := range slots { + my := myStart + uint32(i) + pads := mixing.DCMixPads(secrets.DCSecrets[i], my) + dcmix[i] = wire.MixVect(mixing.DCMix(pads, dcMsg[i], uint32(slot))) + } + + seenSRs := []chainhash.Hash{sr.Hash()} + dc := wire.NewMsgMixDCNet(identity, sid, run, dcmix, seenSRs) + dc.WriteHash(h) + err = mixing.SignMessage(dc, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(dc) + if err != nil { + t.Fatal(err) + } + + dcVecs := make([]mixing.Vec, 0, len(dcmix)) + for _, vec := range dcmix { + dcVecs = append(dcVecs, mixing.Vec(vec)) + } + res := mixing.XorVectors(dcVecs) + t.Logf("recovered message set %v", res.String()) + + tx := wire.NewMsgTx() + for i := range res { + hash160 := res[i][:] + pkscript := []byte{ + 0: txscript.OP_DUP, + 1: txscript.OP_HASH160, + 2: txscript.OP_DATA_20, + 23: txscript.OP_EQUALVERIFY, + 24: txscript.OP_CHECKSIG, + } + copy(pkscript[3:23], hash160) + tx.AddTxOut(wire.NewTxOut(mixAmount, pkscript)) + } + t.Logf("mixed tx hash %v", tx.TxHash()) + + seenDCs := []chainhash.Hash{dc.Hash()} + cm := wire.NewMsgMixConfirm(identity, sid, run, tx, seenDCs) + cm.WriteHash(h) + err = mixing.SignMessage(cm, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(cm) + if err != nil { + t.Fatal(err) + } + + t.Logf("%s", spew.Sdump(tx)) +} diff --git a/mixing/mixpool/orphans_test.go b/mixing/mixpool/orphans_test.go new file mode 100644 index 0000000000..4374349bd8 --- /dev/null +++ b/mixing/mixpool/orphans_test.go @@ -0,0 +1,262 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "errors" + "reflect" + "testing" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/wire" +) + +var ( + testStartingHeight uint32 = 100 + testStartingBlock = chainhash.Hash{100} +) + +var testnetParams = chaincfg.TestNet3Params() + +type testBlockchain struct{} + +func newTestBlockchain() *testBlockchain { + return &testBlockchain{} +} + +func (b *testBlockchain) CurrentTip() (chainhash.Hash, int64) { + return testStartingBlock, int64(testStartingHeight) +} + +func (b *testBlockchain) ChainParams() *chaincfg.Params { + return testnetParams +} + +// Intentionally create orphans and test their acceptance behavior when PRs +// and KEs are accepted. +func TestOrphans(t *testing.T) { + pub, priv, err := generateSecp256k1(nil) + if err != nil { + t.Fatal(err) + } + id := *(*[33]byte)(pub.SerializeCompressed()) + + h := blake256.New() + + pr := &wire.MsgMixPairReq{ + Identity: id, + UTXOs: []wire.MixPairReqUTXO{ + {}, + }, + MessageCount: 1, + Expiry: testStartingHeight + 10, + ScriptClass: string(mixing.ScriptClassP2PKHv0), + InputValue: 1 << 18, + } + err = mixing.SignMessage(pr, priv) + if err != nil { + t.Fatal(err) + } + pr.WriteHash(h) + + prs := []*wire.MsgMixPairReq{pr} + epoch := uint64(1704067200) + sid := mixing.SortPRsForSession(prs, epoch) + ke1 := &wire.MsgMixKeyExchange{ + Identity: id, + SeenPRs: []chainhash.Hash{ + pr.Hash(), + }, + SessionID: sid, + Epoch: epoch, + Run: 0, + } + err = mixing.SignMessage(ke1, priv) + if err != nil { + t.Fatal(err) + } + ke1.WriteHash(h) + + ke2 := &wire.MsgMixKeyExchange{ + Identity: id, + SeenPRs: []chainhash.Hash{ + pr.Hash(), + }, + SessionID: sid, + Epoch: epoch, + Run: 1, + } + err = mixing.SignMessage(ke2, priv) + if err != nil { + t.Fatal(err) + } + ke2.WriteHash(h) + + fp1 := &wire.MsgMixFactoredPoly{ + Identity: id, + SessionID: sid, + Run: 0, + } + err = mixing.SignMessage(fp1, priv) + if err != nil { + t.Fatal(err) + } + fp1.WriteHash(h) + + fp2 := &wire.MsgMixFactoredPoly{ + Identity: id, + SessionID: sid, + Run: 1, + } + err = mixing.SignMessage(fp2, priv) + if err != nil { + t.Fatal(err) + } + fp2.WriteHash(h) + + t.Logf("pr %s", pr.Hash()) + t.Logf("ke1 %s", ke1.Hash()) + t.Logf("ke2 %s", ke2.Hash()) + t.Logf("fp1 %s", fp1.Hash()) + t.Logf("fp2 %s", fp2.Hash()) + + // Create a pair request, several KEs, and later messages belong to + // the session and run increment for each KE. Test different + // combinations of acceptance order to test orphan processing of + // various message types. + type accept struct { + desc string + message mixing.Message + errors bool + errAs interface{} + accepted []mixing.Message + } + tests := [][]accept{ + // Accept KE, then PR, then FP + 0: {{ + desc: "accept KE before PR", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR after KE; both should now process", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1}, + }, { + desc: "accept future message in accepted KE session/run", + message: fp1, + errors: false, // maybe later. + accepted: []mixing.Message{fp1}, + }}, + + // Accept FP, then KE, then PR + 1: {{ + desc: "accept FP first", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept KE", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR; all should now be processed", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1, fp1}, + }}, + + // Accept PR, then FP1, then FP2, then KE1, then KE2. + 2: {{ + desc: "accept PR first", + message: pr, + errors: false, + accepted: []mixing.Message{pr}, + }, { + desc: "accept FP1", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept FP2", + message: fp2, + errors: false, + accepted: nil, + }, { + desc: "accept KE1", + message: ke1, + errors: false, + accepted: []mixing.Message{ke1, fp1}, + }, { + desc: "accept KE2", + message: ke2, + errors: false, + accepted: []mixing.Message{ke2, fp2}, + }}, + + 3: {{ + desc: "accept FP1", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept FP2", + message: fp2, + errors: false, + accepted: nil, + }, { + desc: "accept KE1", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept KE2", + message: ke2, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR last", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1, ke2, fp1, fp2}, + }}, + } + + for i, accepts := range tests { + t.Logf("test %d", i) + mp := NewPool(newTestBlockchain()) + + for j, a := range accepts { + accepted, err := mp.AcceptMessage(a.message) + if err != nil != a.errors { + t.Errorf("test %d call %d %q: unexpected error: %v", i, j, a.desc, err) + } + if a.errors && !errors.As(err, &a.errAs) { + t.Errorf("test %d call %d %q: unexpected error: %v", i, j, a.desc, err) + } + if len(accepted) != len(a.accepted) { + t.Logf("orphans: %v", mp.orphans) + t.Logf("orphansByID: %v", mp.orphansByID) + t.Logf("pr: %v", pr) + t.Logf("ke2: %v", ke2) + t.Logf("fp2: %v", fp2) + t.Errorf("test %d call %d %q: accepted lengths differ %d != %d", i, j, a.desc, + len(accepted), len(a.accepted)) + } + if !reflect.DeepEqual(accepted, a.accepted) { + t.Errorf("test %d call %d %q: accepted messages differs: %#v", i, j, a.desc, accepted) + } + } + } +} diff --git a/mixing/utxoproof/utxoproof.go b/mixing/utxoproof/utxoproof.go new file mode 100644 index 0000000000..e0a33ec38e --- /dev/null +++ b/mixing/utxoproof/utxoproof.go @@ -0,0 +1,90 @@ +// Copyright (c) 2023 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package utxoproof + +import ( + "encoding/binary" + + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" + "github.com/decred/dcrd/dcrec/secp256k1/v4/schnorr" +) + +// Tags and schemes describing the message being signed. +// +// These strings must not contain the comma, which is reserved as a separator +// character. +const ( + tag = "mixpr-utxoproof" + + // schemes + secp256k1P2PKH = "P2PKH(EC-Schnorr-DCRv0)" +) + +var sep = []byte{','} + +// The signature hash is created from the serialization of: +// tag , scheme , expiry pubkey +// No separator is written after expiry; it is fixed length. + +// Secp256k1KeyPair provides access to the serialized public key and parsed +// private key of a secp256k1 key pair. +type Secp256k1KeyPair struct { + Pub []byte + Priv *secp256k1.PrivateKey +} + +// SignUtxoProof returns the UTXO proof of ownership over an output controlled +// by the keypair. The UTXO proof is only valid for the provided expiry +// height to prevent its inclusion in other PR messages signed by an unrelated +// identity. +func (k *Secp256k1KeyPair) SignUtxoProof(expires uint32) ([]byte, error) { + const scheme = secp256k1P2PKH + + h := blake256.New() + h.Write([]byte(tag)) + h.Write(sep) + h.Write([]byte(scheme)) + h.Write(sep) + expiresBytes := binary.BigEndian.AppendUint32(make([]byte, 0, 4), expires) + h.Write(expiresBytes) + h.Write(k.Pub) + hash := h.Sum(nil) + + sig, err := schnorr.Sign(k.Priv, hash) + if err != nil { + return nil, err + } + + return sig.Serialize(), nil +} + +// ValidateSecp256k1P2PKH validates the UTXO proof of an output controlled by +// a secp256k1 keypair for the given expiry height. Returns true only if the +// proof is valid. +func ValidateSecp256k1P2PKH(pubkey, proof []byte, expires uint32) bool { + const scheme = secp256k1P2PKH + + pubkeyParsed, err := secp256k1.ParsePubKey(pubkey) + if err != nil { + return false + } + proofParsed, err := schnorr.ParseSignature(proof) + if err != nil { + return false + } + + h := blake256.New() + h.Write([]byte(tag)) + h.Write(sep) + h.Write([]byte(scheme)) + h.Write(sep) + expiresBytes := binary.BigEndian.AppendUint32(make([]byte, 0, 4), expires) + h.Write(expiresBytes) + h.Write(pubkey) + hash := h.Sum(nil) + + return proofParsed.Verify(hash, pubkeyParsed) +}