diff --git a/mixing/go.mod b/mixing/go.mod index f7624ddb3f..128b83241b 100644 --- a/mixing/go.mod +++ b/mixing/go.mod @@ -3,16 +3,27 @@ module github.com/decred/dcrd/mixing go 1.17 require ( + decred.org/cspp/v2 v2.2.0 github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a + github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/chaincfg/chainhash v1.0.4 github.com/decred/dcrd/chaincfg/v3 v3.2.0 github.com/decred/dcrd/crypto/blake256 v1.0.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 + github.com/decred/dcrd/dcrutil/v4 v4.0.1 + github.com/decred/dcrd/txscript/v4 v4.1.0 github.com/decred/dcrd/wire v1.7.0 + github.com/decred/slog v1.2.0 golang.org/x/crypto v0.23.0 ) require ( + github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect + github.com/dchest/siphash v1.2.3 // indirect + github.com/decred/base58 v1.0.5 // indirect + github.com/decred/dcrd/crypto/ripemd160 v1.0.2 // indirect + github.com/decred/dcrd/dcrec v1.0.1 // indirect + github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect golang.org/x/sys v0.20.0 // indirect lukechampine.com/blake3 v1.3.0 // indirect diff --git a/mixing/go.sum b/mixing/go.sum index e95c41b45f..5a81ac80a5 100644 --- a/mixing/go.sum +++ b/mixing/go.sum @@ -1,27 +1,48 @@ +decred.org/cspp/v2 v2.2.0 h1:VSOUC1w0Wo+QOGS0r1XO6TLnO16X67KuvpDmRRYyr08= +decred.org/cspp/v2 v2.2.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA= +github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= +github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0= github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= +github.com/decred/base58 v1.0.5 h1:hwcieUM3pfPnE/6p3J100zoRfGkQxBulZHo7GZfOqic= +github.com/decred/base58 v1.0.5/go.mod h1:s/8lukEHFA6bUQQb/v3rjUySJ2hu+RioCzLukAVkrfw= github.com/decred/dcrd/chaincfg/chainhash v1.0.4 h1:zRCv6tdncLfLTKYqu7hrXvs7hW+8FO/NvwoFvGsrluU= github.com/decred/dcrd/chaincfg/chainhash v1.0.4/go.mod h1:hA86XxlBWwHivMvxzXTSD0ZCG/LoYsFdWnCekkTMCqY= github.com/decred/dcrd/chaincfg/v3 v3.2.0 h1:6WxA92AGBkycEuWvxtZMvA76FbzbkDRoK8OGbsR2muk= github.com/decred/dcrd/chaincfg/v3 v3.2.0/go.mod h1:2rHW1TKyFmwZTVBLoU/Cmf0oxcpBjUEegbSlBfrsriI= github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= +github.com/decred/dcrd/crypto/ripemd160 v1.0.2 h1:TvGTmUBHDU75OHro9ojPLK+Yv7gDl2hnUvRocRCjsys= +github.com/decred/dcrd/crypto/ripemd160 v1.0.2/go.mod h1:uGfjDyePSpa75cSQLzNdVmWlbQMBuiJkvXw/MNKRY4M= +github.com/decred/dcrd/dcrec v1.0.1 h1:gDzlndw0zYxM5BlaV17d7ZJV6vhRe9njPBFeg4Db2UY= +github.com/decred/dcrd/dcrec v1.0.1/go.mod h1:CO+EJd8eHFb8WHa84C7ZBkXsNUIywaTHb+UAuI5uo6o= +github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 h1:l/lhv2aJCUignzls81+wvga0TFlyoZx8QxRMQgXpZik= +github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3/go.mod h1:AKpV6+wZ2MfPRJnTbQ6NPgWrKzbe9RCIlCF/FKzMtM8= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/decred/dcrd/dcrutil/v4 v4.0.1 h1:E+d2TNbpOj0f1L9RqkZkEm1QolFjajvkzxWC5WOPf1s= +github.com/decred/dcrd/dcrutil/v4 v4.0.1/go.mod h1:7EXyHYj8FEqY+WzMuRkF0nh32ueLqhutZDoW4eQ+KRc= +github.com/decred/dcrd/txscript/v4 v4.1.0 h1:uEdcibIOl6BuWj3AqmXZ9xIK/qbo6lHY9aNk29FtkrU= +github.com/decred/dcrd/txscript/v4 v4.1.0/go.mod h1:OVguPtPc4YMkgssxzP8B6XEMf/J3MB6S1JKpxgGQqi0= github.com/decred/dcrd/wire v1.6.0/go.mod h1:XQ8Xv/pN/3xaDcb7sH8FBLS9cdgVctT7HpBKKGsIACk= github.com/decred/dcrd/wire v1.7.0 h1:5JHiDjEQeS4XUl4PfnTZYLwAD/E/+LwBmPRec/fP76o= github.com/decred/dcrd/wire v1.7.0/go.mod h1:lAqrzV0SU4kyV6INLEJgDtUjJaTaVKrbF4LHtaYl+zU= +github.com/decred/slog v1.2.0 h1:soHAxV52B54Di3WtKLfPum9OFfWqwtf/ygf9njdfnPM= +github.com/decred/slog v1.2.0/go.mod h1:kVXlGnt6DHy2fV5OjSeuvCJ0OmlmTF6LFpEPMu/fOY0= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jrick/wsrpc/v2 v2.3.5/go.mod h1:7oBeDM/xMF6Yqy4GDAjpppuOf1hm6lWsaG3EaMrm+aA= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -31,6 +52,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -38,20 +61,29 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/mixing/mixpool/errors.go b/mixing/mixpool/errors.go new file mode 100644 index 0000000000..8beae56641 --- /dev/null +++ b/mixing/mixpool/errors.go @@ -0,0 +1,87 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "errors" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +// RuleError represents a mixpool rule violation. +// +// RuleErrors can be treated as a bannable offense. +type RuleError struct { + Err error +} + +func ruleError(err error) *RuleError { + return &RuleError{Err: err} +} + +func (e *RuleError) Error() string { + return e.Err.Error() +} + +func (e *RuleError) Unwrap() error { + return e.Err +} + +// Errors wrapped by RuleError. +var ( + // ErrChangeDust is returned by AcceptMessage if a pair request's + // change amount is dust. + ErrChangeDust = errors.New("change output is dust") + + // ErrInvalidMessageCount is returned by AcceptMessage if a + // pair request contains an invalid message count. + ErrInvalidMessageCount = errors.New("message count must be positive") + + // ErrInvalidScript is returned by AcceptMessage if a pair request + // contains an invalid script. + ErrInvalidScript = errors.New("invalid script") + + // ErrInvalidSessionID is returned by AcceptMessage if the message + // contains an invalid session id. + ErrInvalidSessionID = errors.New("invalid session ID") + + // ErrInvalidSignature is returned by AcceptMessage if the message is + // not properly signed for the claimed identity. + ErrInvalidSignature = errors.New("invalid message signature") + + // ErrInvalidTotalMixAmount is returned by AcceptMessage if a pair + // request contains the product of the message count and mix amount + // that exceeds the total input value. + ErrInvalidTotalMixAmount = errors.New("invalid total mix amount") + + // ErrInvalidUTXOProof is returned by AcceptMessage if a pair request + // fails to prove ownership of each utxo. + ErrInvalidUTXOProof = errors.New("invalid UTXO ownership proof") + + // ErrMissingUTXOs is returned by AcceptMessage if a pair request + // message does not reference any previous UTXOs. + ErrMissingUTXOs = errors.New("pair request contains no UTXOs") +) + +var ( + // ErrSecretsRevealed is returned by Receive if any peer has + // unexpectedly revealed their secrets during a run stage + // (Received.RSs field is nil). This requires all other peers to quit + // the run, reveal their secrets, and perform blame assignment. + ErrSecretsRevealed = errors.New("secrets revealed by peer") +) + +// MissingOwnPRError represents the error condition where a key exchange +// message cannot be accepted to the mixpool due to it not referencing the +// owner's own pair request. The KE is recorded as an orphan and may be +// processed later. The error contains all unknown PR hashes so they can be +// fetched from another instance. +type MissingOwnPRError struct { + MissingPR chainhash.Hash +} + +func (e *MissingOwnPRError) Error() string { + return "KE identity's own PR is missing from mixpool" +} diff --git a/mixing/mixpool/log.go b/mixing/mixpool/log.go new file mode 100644 index 0000000000..22d6482844 --- /dev/null +++ b/mixing/mixpool/log.go @@ -0,0 +1,22 @@ +// Copyright (c) 2020-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "github.com/decred/slog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +// The default amount of logging is none. +var log = slog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using slog. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/mixing/mixpool/mixpool.go b/mixing/mixpool/mixpool.go new file mode 100644 index 0000000000..625109f27c --- /dev/null +++ b/mixing/mixpool/mixpool.go @@ -0,0 +1,1550 @@ +// Copyright (c) 2023-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +// Package mixpool provides an in-memory pool of mixing messages for full nodes +// that relay these messages and mixing wallets that send and receive them. +package mixpool + +import ( + "bytes" + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/utxoproof" + "github.com/decred/dcrd/txscript/v4" + "github.com/decred/dcrd/txscript/v4/stdscript" + "github.com/decred/dcrd/wire" +) + +const minconf = 2 +const feeRate = 0.0001e8 + +type idPubKey = [33]byte + +type msgtype int + +// Message type constants, for quickly checking looked up entries by message +// hash match the expected type (without performing a type assertion). +// Excludes PR. +const ( + msgtypeKE msgtype = 1 + iota + msgtypeCT + msgtypeSR + msgtypeDC + msgtypeCM + msgtypeFP + msgtypeRS + + nmsgtypes = msgtypeRS +) + +func (m msgtype) String() string { + switch m { + case msgtypeKE: + return "KE" + case msgtypeCT: + return "CT" + case msgtypeSR: + return "SR" + case msgtypeDC: + return "DC" + case msgtypeCM: + return "CM" + case msgtypeFP: + return "FP" + case msgtypeRS: + return "RS" + default: + return "?" + } +} + +// entry describes non-PR messages accepted to the pool. +type entry struct { + hash chainhash.Hash + sid [32]byte + recvTime time.Time + msg mixing.Message + msgtype msgtype + run uint32 +} + +type orphan struct { + message mixing.Message + accepted time.Time +} + +type session struct { + sid [32]byte + runs []runstate + expiry uint32 + bc broadcast +} + +type runstate struct { + prs []chainhash.Hash + counts [nmsgtypes]uint32 + hashes map[chainhash.Hash]struct{} +} + +func (r *runstate) countFor(t msgtype) uint32 { + return r.counts[t-1] +} + +func (r *runstate) incrementCountFor(t msgtype) { + r.counts[t-1]++ +} + +type broadcast struct { + ch chan struct{} + mu sync.Mutex +} + +// wait returns the wait channel that is closed whenever a message is received +// for a session. Waiters must acquire the pool lock before reading messages. +func (b *broadcast) wait() <-chan struct{} { + b.mu.Lock() + ch := b.ch + b.mu.Unlock() + + return ch +} + +func (b *broadcast) signal() { + b.mu.Lock() + close(b.ch) + b.ch = make(chan struct{}) + b.mu.Unlock() +} + +// Pool records in-memory mix messages that have been broadcast over the +// peer-to-peer network. +type Pool struct { + mtx sync.RWMutex + prs map[chainhash.Hash]*wire.MsgMixPairReq + outPoints map[wire.OutPoint]chainhash.Hash + pool map[chainhash.Hash]entry + orphans map[chainhash.Hash]*orphan + orphansByID map[idPubKey]map[chainhash.Hash]mixing.Message + messagesByIdentity map[idPubKey][]chainhash.Hash + latestKE map[idPubKey]*wire.MsgMixKeyExchange + sessions map[[32]byte]*session + sessionsByTxHash map[chainhash.Hash]*session + epoch time.Duration + expireHeight uint32 + expireSem chan struct{} + + blockchain BlockChain + utxoFetcher UtxoFetcher + feeRate int64 + params *chaincfg.Params +} + +// UtxoEntry provides details regarding unspent transaction outputs. +type UtxoEntry interface { + IsSpent() bool + PkScript() []byte + ScriptVersion() uint16 + BlockHeight() int64 + Amount() int64 +} + +// UtxoFetcher defines methods used to validate unspent transaction outputs in +// the pair request message. It is optional, but should be implemented by full +// nodes that have this capability to detect and stop relay of spam and junk +// messages. +type UtxoFetcher interface { + // FetchUtxoEntry defines the function to use to fetch unspent + // transaction output information. + FetchUtxoEntry(wire.OutPoint) (UtxoEntry, error) +} + +// BlockChain queries the current status of the blockchain. Its methods should +// be able to be implemented by both full nodes and SPV wallets. +type BlockChain interface { + // ChainParams identifies which chain parameters the mixing pool is + // associated with. + ChainParams() *chaincfg.Params + + // CurrentTip returns the hash and height of the current tip block. + CurrentTip() (chainhash.Hash, int64) +} + +// NewPool returns a new mixing pool that accepts and validates mixing messages +// required for distributed transaction mixing. +func NewPool(blockchain BlockChain) *Pool { + pool := &Pool{ + prs: make(map[chainhash.Hash]*wire.MsgMixPairReq), + outPoints: make(map[wire.OutPoint]chainhash.Hash), + pool: make(map[chainhash.Hash]entry), + orphans: make(map[chainhash.Hash]*orphan), + orphansByID: make(map[idPubKey]map[chainhash.Hash]mixing.Message), + messagesByIdentity: make(map[idPubKey][]chainhash.Hash), + latestKE: make(map[idPubKey]*wire.MsgMixKeyExchange), + sessions: make(map[[32]byte]*session), + sessionsByTxHash: make(map[chainhash.Hash]*session), + epoch: 10 * time.Minute, // XXX: mainnet epoch: add to chainparams + expireHeight: 0, + expireSem: make(chan struct{}, 1), + blockchain: blockchain, + feeRate: feeRate, + params: blockchain.ChainParams(), + } + // XXX: add epoch to chainparams + if blockchain.ChainParams().Net == wire.TestNet3 { + pool.epoch = 3 * time.Minute + } + + if u, ok := blockchain.(UtxoFetcher); ok { + pool.utxoFetcher = u + } + return pool +} + +// Epoch returns the duration between mix epochs. +func (p *Pool) Epoch() time.Duration { + return p.epoch +} + +// MixPRHashes returns the hashes of all MixPR messages recorded by the pool. +// This data is provided to peers requesting initial state of the mixpool. +func (p *Pool) MixPRHashes() []chainhash.Hash { + p.mtx.RLock() + hashes := make([]chainhash.Hash, 0, len(p.prs)) + for hash := range p.prs { + hashes = append(hashes, hash) + } + p.mtx.RUnlock() + + return hashes +} + +// Message searches the mixing pool for a message by its hash. +func (p *Pool) Message(query *chainhash.Hash) (mixing.Message, error) { + p.mtx.RLock() + pr := p.prs[*query] + e, ok := p.pool[*query] + p.mtx.RUnlock() + if pr != nil { + return pr, nil + } + if !ok || e.msg == nil { + return nil, fmt.Errorf("message not found") + } + return e.msg, nil +} + +// HaveMessage checks whether the mixing pool contains a message by its hash. +func (p *Pool) HaveMessage(query *chainhash.Hash) bool { + p.mtx.RLock() + _, ok := p.pool[*query] + if !ok { + _, ok = p.prs[*query] + } + p.mtx.RUnlock() + return ok +} + +// MixPR searches the mixing pool for a PR message by its hash. +func (p *Pool) MixPR(query *chainhash.Hash) (*wire.MsgMixPairReq, error) { + var pr *wire.MsgMixPairReq + + p.mtx.RLock() + pr = p.prs[*query] + p.mtx.RUnlock() + + if pr == nil { + return nil, fmt.Errorf("PR message not found") + } + + return pr, nil +} + +// MixPRs returns all MixPR messages with hashes matching the query. Unknown +// messages are ignored. +// +// If query is nil, all PRs are returned. +// +// In both cases, any expired PRs that are still internally tracked by the +// mixpool for ongoing sessions are excluded from the result set. +func (p *Pool) MixPRs(query []chainhash.Hash) []*wire.MsgMixPairReq { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeConfirmedRuns() + + if query == nil { + res := make([]*wire.MsgMixPairReq, 0, len(p.prs)) + for _, pr := range p.prs { + // Exclude expired but not yet removed PRs. + if pr.Expiry <= p.expireHeight { + continue + } + + res = append(res, pr) + } + return res + } + + res := make([]*wire.MsgMixPairReq, 0, len(query)) + for i := range query { + pr, ok := p.prs[query[i]] + if ok { + // Exclude expired but not yet removed PRs. + if pr.Expiry <= p.expireHeight { + continue + } + + res = append(res, pr) + } + } + + return res +} + +// CompatiblePRs returns all MixPR messages with pairing descriptions matching +// the parameter. The order is unspecified. +func (p *Pool) CompatiblePRs(pairing []byte) []*wire.MsgMixPairReq { + p.mtx.RLock() + defer p.mtx.RUnlock() + + res := make([]*wire.MsgMixPairReq, 0, len(p.prs)) + for _, pr := range p.prs { + prPairing, _ := pr.Pairing() + if bytes.Equal(pairing, prPairing) { + res = append(res, pr) + } + } + + // Sort by decreasing expiries and remove any PRs double spending an + // output with an earlier expiry. + sort.Slice(res, func(i, j int) bool { + return res[i].Expiry >= res[j].Expiry + }) + seen := make(map[wire.OutPoint]uint32) + for i, pr := range res { + for _, utxo := range pr.UTXOs { + prevExpiry, ok := seen[utxo.OutPoint] + if !ok { + seen[utxo.OutPoint] = pr.Expiry + } else if pr.Expiry < prevExpiry { + res[i] = nil + } + } + } + filtered := res[:0] + for i := range res { + if res[i] != nil { + filtered = append(filtered, res[i]) + } + } + + // Sort again lexicographically by hash. + sort.Slice(filtered, func(i, j int) bool { + a := filtered[i].Hash() + b := filtered[j].Hash() + return bytes.Compare(a[:], b[:]) < 1 + }) + return filtered +} + +// ExpireMessagesInBackground will, after the current epoch period ends, +// remove all pair requests that indicate an expiry at or before the height +// parameter and removes all messages that chain back to a removed pair +// request. +// +// If a previous call is still waiting in the background to remove messages, +// this method has no effect, and proper usage to avoid a mixpool memory leak +// requires it to be consistently called as more blocks are processed. +func (p *Pool) ExpireMessagesInBackground(height uint32) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if p.expireHeight == 0 { + p.expireHeight = height + } + + select { + case p.expireSem <- struct{}{}: + go p.expireMessages() + default: + } +} + +// waitForExpiry blocks for at least one full epoch, waiting until two epoch +// ticks from now. +func (p *Pool) waitForExpiry() { + now := time.Now().UTC() + epoch := now.Truncate(p.epoch).Add(2 * p.epoch) + duration := epoch.Sub(now) + time.Sleep(duration) +} + +func (p *Pool) expireMessages() { + p.waitForExpiry() + + p.mtx.Lock() + defer func() { + <-p.expireSem + p.mtx.Unlock() + }() + + height := p.expireHeight + p.expireHeight = 0 + + p.expireMessagesNow(height) +} + +// ExpireMessages immediately expires all pair requests and sessions built +// from them that indicate expiry at or after a block height. +func (p *Pool) ExpireMessages(height uint32) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.expireMessagesNow(height) + p.expireHeight = 0 +} + +func (p *Pool) expireMessagesNow(height uint32) { + // Expire sessions and their messages + for sid, ses := range p.sessions { + if ses.expiry > height { + continue + } + + delete(p.sessions, sid) + for _, r := range ses.runs { + for hash := range r.hashes { + delete(p.pool, hash) + } + } + } + + // Expire PRs and remove identity tracking + for _, pr := range p.prs { + if pr.Expiry > height { + continue + } + + p.removePR(pr, "expired") + } + + // Expire orphans with old receive times, and in the case of any + // orphan KE, expire those with old epochs. + for hash, o := range p.orphans { + expire := time.Since(o.accepted) >= 20*time.Minute + if !expire { + if ke, ok := o.message.(*wire.MsgMixKeyExchange); ok { + epoch := time.Unix(int64(ke.Epoch), 0) + expire = time.Since(epoch) >= 20*time.Minute + } + } + if expire { + delete(p.orphans, hash) + delete(p.orphansByID, *(*idPubKey)(o.message.Pub())) + } + } +} + +// RemoveMessage removes a message that was rejected by the network. +func (p *Pool) RemoveMessage(msg mixing.Message) { + p.mtx.Lock() + defer p.mtx.Unlock() + + msgHash := msg.Hash() + delete(p.pool, msgHash) + if pr, ok := msg.(*wire.MsgMixPairReq); ok { + p.removePR(pr, "rejected") + } + if ke, ok := msg.(*wire.MsgMixKeyExchange); ok { + delete(p.latestKE, ke.Identity) + } +} + +// RemoveSession removes all non-PR messages from a completed or errored +// session. PR messages of a successful run (or rerun) are also removed. +func (p *Pool) RemoveSession(sid [32]byte, success bool) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeSession(sid, nil, success) +} + +func (p *Pool) removeSession(sid [32]byte, txHash *chainhash.Hash, success bool) { + ses := p.sessions[sid] + if ses == nil { + return + } + + // Delete PRs used to form final run + var removePRs []chainhash.Hash + var lastRun *runstate + if success { + lastRun = &ses.runs[len(ses.runs)-1] + removePRs = lastRun.prs + } + + if txHash != nil || success { + if txHash == nil { + // XXX: may be better to store this in the runstate as + // a CM is received. + for h := range lastRun.hashes { + if e, ok := p.pool[h]; ok && e.msgtype == msgtypeCM { + cm := e.msg.(*wire.MsgMixConfirm) + hash := cm.Mix.TxHash() + txHash = &hash + break + } + } + } + if txHash != nil { + delete(p.sessionsByTxHash, *txHash) + } + } + + delete(p.sessions, sid) + for _, r := range ses.runs { + for hash := range r.hashes { + delete(p.pool, hash) + } + } + + for _, prHash := range removePRs { + delete(p.pool, prHash) + if pr := p.prs[prHash]; pr != nil { + p.removePR(pr, "mixed") + } + } +} + +// RemoveConfirmedRuns removes all messages including pair requests from +// runs which ended in each peer sending a confirm mix message. +func (p *Pool) RemoveConfirmedRuns() { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.removeConfirmedRuns() +} + +func (p *Pool) removeConfirmedRuns() { + for sid, ses := range p.sessions { + lastRun := &ses.runs[len(ses.runs)-1] + cmCount := lastRun.countFor(msgtypeCM) + if uint32(len(lastRun.prs)) != cmCount { + continue + } + + delete(p.sessions, sid) + for _, run := range ses.runs { + for hash := range run.hashes { + delete(p.pool, hash) + } + } + + for _, hash := range lastRun.prs { + delete(p.pool, hash) + pr := p.prs[hash] + if pr != nil { + p.removePR(pr, "confirmed") + } + } + } +} + +// RemoveConfirmedMixes removes sessions and messages belonging to a completed +// session that resulted in published or mined transactions. Transaction +// hashes not associated with a session are ignored. PRs from the successful +// mix run are removed from the pool. +func (p *Pool) RemoveConfirmedMixes(txHashes []chainhash.Hash) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for i := range txHashes { + hash := &txHashes[i] + ses := p.sessionsByTxHash[*hash] + if ses == nil { + continue + } + + p.removeSession(ses.sid, hash, true) + } +} + +// RemoveSpentPRs removes all pair requests that are spent by any transaction +// input. +func (p *Pool) RemoveSpentPRs(txs []*wire.MsgTx) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, tx := range txs { + txHash := tx.TxHash() + ses, ok := p.sessionsByTxHash[txHash] + if ok { + p.removeSession(ses.sid, &txHash, true) + continue + } + + for _, in := range tx.TxIn { + prHash := p.outPoints[in.PreviousOutPoint] + pr, ok := p.prs[prHash] + if ok { + p.removePR(pr, "double spent") + } + } + } +} + +// ReceiveKEsByPairing returns the most recently received run-0 KE messages by +// a peer that reference PRs of a particular pairing and epoch. +func (p *Pool) ReceiveKEsByPairing(pairing []byte, epoch uint64) []*wire.MsgMixKeyExchange { + p.mtx.RLock() + defer p.mtx.RUnlock() + + var kes []*wire.MsgMixKeyExchange + for id, ke := range p.latestKE { + if ke.Epoch != epoch { + continue + } + prHash := p.messagesByIdentity[id][0] + pr := p.prs[prHash] + prPairing, err := pr.Pairing() + if err != nil { + continue + } + if bytes.Equal(pairing, prPairing) { + kes = append(kes, ke) + } + } + return kes +} + +// RemoveUnresponsiveDuringEpoch removes pair requests of unresponsive peers +// that did not provide any key exchange messages during the epoch in which a +// mix occurred. +func (p *Pool) RemoveUnresponsiveDuringEpoch(prs []*wire.MsgMixPairReq, epoch uint64) { + p.mtx.Lock() + defer p.mtx.Unlock() + +PRLoop: + for _, pr := range prs { + for _, msgHash := range p.messagesByIdentity[pr.Identity] { + msg, ok := p.pool[msgHash].msg.(*wire.MsgMixKeyExchange) + if !ok { + continue + } + if msg.Epoch == epoch { + continue PRLoop + } + } + + p.removePR(pr, "unresponsive") + } +} + +// Received is a parameter for Pool.Receive describing the session and run to +// receive messages for, and slices for returning results. A single non-nil +// slice is required and indicates which message slice will be will be +// appended to. Received messages are unsorted. +type Received struct { + Sid [32]byte + Run uint32 + KEs []*wire.MsgMixKeyExchange + CTs []*wire.MsgMixCiphertexts + SRs []*wire.MsgMixSlotReserve + DCs []*wire.MsgMixDCNet + CMs []*wire.MsgMixConfirm + FPs []*wire.MsgMixFactoredPoly + RSs []*wire.MsgMixSecrets +} + +// Receive returns messages matching a session, run, and message type, waiting +// until all described messages have been received, or earlier with the +// messages received so far if the context is cancelled before this point. +// +// Receive only returns results for the session ID and run increment in the r +// parameter. If no such session or run has any messages currently accepted +// in the mixpool, the method immediately errors. +// +// If any secrets messages are received for the described session and run, and +// r.RSs is nil, Receive immediately returns ErrSecretsRevealed. An +// additional call to Receive with a non-nil RSs can be used to receive all of +// the secrets after each peer publishes their own revealed secrets. +func (p *Pool) Receive(ctx context.Context, expectedMessages int, r *Received) error { + sid := r.Sid + run := r.Run + var bc *broadcast + var rs *runstate + + p.mtx.RLock() + ses, ok := p.sessions[sid] + if !ok { + p.mtx.RUnlock() + return fmt.Errorf("unknown session %x", sid[:]) + } + bc = &ses.bc + if run >= uint32(len(ses.runs)) { + p.mtx.RUnlock() + return fmt.Errorf("unknown run %d", run) + } + rs = &ses.runs[run] + + nonNilSlices := 0 + if r.KEs != nil { + nonNilSlices++ + } + if r.CTs != nil { + nonNilSlices++ + } + if r.SRs != nil { + nonNilSlices++ + } + if r.DCs != nil { + nonNilSlices++ + } + if r.CMs != nil { + nonNilSlices++ + } + if r.FPs != nil { + nonNilSlices++ + } + if r.RSs != nil { + nonNilSlices++ + } + if nonNilSlices != 1 { + return fmt.Errorf("mixpool: exactly one Received slice must be non-nil") + } + +Loop: + for { + // Pool is locked for reads. Count if the total number of + // expected messages have been received. + received := 0 + for hash := range rs.hashes { + msgtype := p.pool[hash].msgtype + switch { + case msgtype == msgtypeKE && r.KEs != nil: + received++ + case msgtype == msgtypeCT && r.CTs != nil: + received++ + case msgtype == msgtypeSR && r.SRs != nil: + received++ + case msgtype == msgtypeDC && r.DCs != nil: + received++ + case msgtype == msgtypeCM && r.CMs != nil: + received++ + case msgtype == msgtypeFP && r.FPs != nil: + received++ + case msgtype == msgtypeRS: + if r.RSs == nil { + // Since initial reporters of secrets + // need to take the blame for + // erroneous blame assignment if no + // issue was detected, we only trigger + // this for RS messages that do not + // reference any other previous RS. + rs := p.pool[hash].msg.(*wire.MsgMixSecrets) + prev := rs.PrevMsgs() + if len(prev) == 0 { + p.mtx.RUnlock() + return ErrSecretsRevealed + } + } else { + received++ + } + } + } + if received >= expectedMessages { + break + } + + // Unlock while waiting for the broadcast channel. + p.mtx.RUnlock() + + select { + case <-ctx.Done(): + p.mtx.RLock() + break Loop + case <-bc.wait(): + } + + p.mtx.RLock() + } + + // Pool is locked for reads. Collect all of the messages. + for hash := range rs.hashes { + msg := p.pool[hash].msg + switch msg := msg.(type) { + case *wire.MsgMixKeyExchange: + if r.KEs != nil { + r.KEs = append(r.KEs, msg) + } + case *wire.MsgMixCiphertexts: + if r.CTs != nil { + r.CTs = append(r.CTs, msg) + } + case *wire.MsgMixSlotReserve: + if r.SRs != nil { + r.SRs = append(r.SRs, msg) + } + case *wire.MsgMixDCNet: + if r.DCs != nil { + r.DCs = append(r.DCs, msg) + } + case *wire.MsgMixConfirm: + if r.CMs != nil { + r.CMs = append(r.CMs, msg) + } + case *wire.MsgMixFactoredPoly: + if r.FPs != nil { + r.FPs = append(r.FPs, msg) + } + case *wire.MsgMixSecrets: + if r.RSs != nil { + r.RSs = append(r.RSs, msg) + } + } + } + + p.mtx.RUnlock() + return nil +} + +var zeroHash chainhash.Hash + +// AcceptMessage accepts a mixing message to the pool. +// +// Messages must contain the mixing participant's identity and contain a valid +// signature committing to all non-signature fields. +// +// PR messages will not be accepted if they reference an unknown UTXO or if not +// enough fee is contributed. Any other message will not be accepted if it +// references previous messages that are not recorded by the pool. +// +// All newly accepted messages, including any orphan key exchange messages +// that were processed after processing missing pair requests, are returned. +func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err error) { + defer func() { + if err == nil { + log.Tracef("AcceptMessage: accepted message %T %v", msg, msg.Hash()) + } else { + log.Tracef("AcceptMessage: rejected message %T %v: %v", msg, msg.Hash(), err) + } + }() + + hash := msg.Hash() + if hash == zeroHash { + return nil, fmt.Errorf("message of type %T has not been hashed", msg) + } + + alreadyAccepted := func() bool { + _, ok := p.pool[hash] + if !ok { + _, ok = p.prs[hash] + } + return ok + } + + // Check if already accepted. + p.mtx.RLock() + ok := alreadyAccepted() + p.mtx.RUnlock() + if ok { + return nil, nil + } + + // Require message to be signed by the presented identity. + if !mixing.VerifySignedMessage(msg) { + return nil, ruleError(ErrInvalidSignature) + } + id := (*idPubKey)(msg.Pub()) + + var msgtype msgtype + switch msg := msg.(type) { + case *wire.MsgMixPairReq: + accepted, err := p.acceptPR(msg, &hash, id) + if err != nil { + return nil, err + } + // Avoid returning a non-nil mixing.Message in return + // variable with a nil PR. + if accepted == nil { + return nil, nil + } + + allAccepted := p.reconsiderOrphans(msg, id) + return allAccepted, nil + + case *wire.MsgMixKeyExchange: + accepted, err := p.acceptKE(msg, &hash, id) + if err != nil { + return nil, err + } + // Avoid returning a non-nil mixing.Message in return + // variable with a nil KE. + if accepted == nil { + return nil, nil + } + allAccepted := p.reconsiderOrphans(msg, id) + return allAccepted, nil + + case *wire.MsgMixCiphertexts: + msgtype = msgtypeCT + case *wire.MsgMixSlotReserve: + msgtype = msgtypeSR + case *wire.MsgMixDCNet: + msgtype = msgtypeDC + case *wire.MsgMixConfirm: + msgtype = msgtypeCM + case *wire.MsgMixFactoredPoly: + msgtype = msgtypeFP + case *wire.MsgMixSecrets: + msgtype = msgtypeRS + default: + return nil, fmt.Errorf("unknown mix message type %T", msg) + } + + if len(msg.Sid()) != 32 { + return nil, ruleError(ErrInvalidSessionID) + } + sid := *(*[32]byte)(msg.Sid()) + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Read lock was given up to acquire write lock. Check if already + // accepted. + if alreadyAccepted() { + return nil, nil + } + + // Check that a message from this identity does not reuse a run number + // for the session. + var haveKE bool + for _, prevHash := range p.messagesByIdentity[*id] { + e := p.pool[prevHash] + run := msg.GetRun() + if e.msgtype == msgtype && e.msg.GetRun() == run && + bytes.Equal(e.msg.Sid(), msg.Sid()) { + return nil, fmt.Errorf("message %v by identity %x "+ + "reuses run number %d in session %x, "+ + "conflicting with already accepted message %v", + hash, *id, run, msg.Sid(), prevHash) + } + if e.msgtype == msgtypeKE && e.msg.GetRun() == run && + bytes.Equal(e.msg.Sid(), msg.Sid()) { + haveKE = true + } + } + // Save as an orphan if their KE is not (yet) known. + if !haveKE { + orphansByID := p.orphansByID[*id] + if _, ok := orphansByID[hash]; ok { + // Already an orphan. + return nil, nil + } + if orphansByID == nil { + orphansByID = make(map[chainhash.Hash]mixing.Message) + p.orphansByID[*id] = orphansByID + } + p.orphans[hash] = &orphan{ + message: msg, + accepted: time.Now(), + } + orphansByID[hash] = msg + + // TODO: Consider return an error containing the unknown + // messages, so they can be getdata'd. + return nil, nil + } + + ses := p.sessions[sid] + if ses == nil { + return nil, fmt.Errorf("%s %s belongs to unknown session %x", + msgtype, &hash, &sid) + } + + err = p.acceptEntry(msg, msgtype, &hash, id, ses) + if err != nil { + return nil, err + } + return []mixing.Message{msg}, nil +} + +// removePR removes a pair request message and all other messages and sessions +// that the peer sent and was involved in. +func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) { + prHash := pr.Hash() + + log.Debugf("Removing %s PR %s by %x", reason, prHash, pr.Identity[:]) + + delete(p.prs, prHash) + for _, hash := range p.messagesByIdentity[pr.Identity] { + e, ok := p.pool[hash] + if !ok { + continue + } + ke, ok := e.msg.(*wire.MsgMixKeyExchange) + if ok { + p.removeSession(ke.SessionID, nil, false) + } + delete(p.pool, hash) + } + delete(p.messagesByIdentity, pr.Identity) + delete(p.latestKE, pr.Identity) + for orphanHash := range p.orphansByID[pr.Identity] { + delete(p.orphans, orphanHash) + } + delete(p.orphansByID, pr.Identity) + for i := range pr.UTXOs { + delete(p.outPoints, pr.UTXOs[i].OutPoint) + } +} + +func (p *Pool) acceptPR(pr *wire.MsgMixPairReq, hash *chainhash.Hash, id *idPubKey) (accepted *wire.MsgMixPairReq, err error) { + switch { + case len(pr.UTXOs) == 0: // Require at least one utxo. + return nil, ruleError(ErrMissingUTXOs) + case pr.MessageCount == 0: // Require at least one mixed message. + return nil, ruleError(ErrInvalidMessageCount) + case pr.InputValue < int64(pr.MessageCount)*pr.MixAmount: + return nil, ruleError(ErrInvalidTotalMixAmount) + case pr.Change != nil: + if isDustAmount(pr.Change.Value, p2pkhv0PkScriptSize, feeRate) { + return nil, ruleError(ErrChangeDust) + } + if !stdscript.IsPubKeyHashScriptV0(pr.Change.PkScript) && + !stdscript.IsScriptHashScriptV0(pr.Change.PkScript) { + return nil, ruleError(ErrInvalidScript) + } + } + + // Check that expiry has not been reached, nor that it is too far + // into the future. This limits replay attacks. + _, curHeight := p.blockchain.CurrentTip() + maxExpiry := mixing.MaxExpiry(uint32(curHeight), p.params) + switch { + case uint32(curHeight) >= pr.Expiry: + return nil, fmt.Errorf("message has expired") + case pr.Expiry > maxExpiry: + return nil, fmt.Errorf("expiry is too far into future") + } + + // Require known script classes. + switch mixing.ScriptClass(pr.ScriptClass) { + case mixing.ScriptClassP2PKHv0: + default: + return nil, fmt.Errorf("unsupported mixing script class") + } + + // Require enough fee contributed from this mixing participant. + // Size estimation assumes mixing.ScriptClassP2PKHv0 outputs and inputs. + err = checkFee(pr, p.feeRate) + if err != nil { + return nil, err + } + + // If able, sanity check UTXOs. + if p.utxoFetcher != nil { + err := p.checkUTXOs(pr, curHeight) + if err != nil { + return nil, err + } + } + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Check if already accepted. + if _, ok := p.prs[*hash]; ok { + return nil, nil + } + + // Discourage identity reuse. PRs should be the first message sent by + // this identity, and there should only be one PR per identity. + if len(p.messagesByIdentity[*id]) != 0 { + return nil, fmt.Errorf("identity reused for a PR message") + } + + // Only accept PRs that double spend outpoints if they expire later + // than existing PRs. Otherwise, reject this PR message. + for i := range pr.UTXOs { + otherPRHash := p.outPoints[pr.UTXOs[i].OutPoint] + otherPR, ok := p.prs[otherPRHash] + if !ok { + continue + } + if otherPR.Expiry >= pr.Expiry { + err := fmt.Errorf("PR double spends outpoints of " + + "already-accepted PR message without " + + "increasing expiry") + return nil, err + } + } + + // Accept the PR + p.prs[*hash] = pr + for i := range pr.UTXOs { + p.outPoints[pr.UTXOs[i].OutPoint] = *hash + } + p.messagesByIdentity[*id] = append(make([]chainhash.Hash, 0, 16), *hash) + + return pr, nil +} + +// reconsiderOrphans reconsiders any messages that are currently saved as +// orphans due to missing previous PR message (in the case of KE orphans) or +// missing the identity's KE in matching session and run (for all other +// messages). The function is recursive: if a reconsidered orphan KE is +// accepted, other orphans by the identity will be considered as well. +func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing.Message { + acceptedMessages := []mixing.Message{accepted} + + var kes []*wire.MsgMixKeyExchange + if ke, ok := accepted.(*wire.MsgMixKeyExchange); ok { + kes = append(kes, ke) + } + + // If the accepted message was a PR, there may be KE orphans that can + // be accepted now. + if pr, ok := accepted.(*wire.MsgMixPairReq); ok { + // Orphan KEs must be accepted in run order. Find all + // possible matching ones and sort by run. + var orphanKEs []*wire.MsgMixKeyExchange + for _, orphan := range p.orphansByID[*id] { + orphanKE, ok := orphan.(*wire.MsgMixKeyExchange) + if !ok { + continue + } + refsAcceptedPR := false + for _, prHash := range orphanKE.SeenPRs { + if pr.Hash() == prHash { + refsAcceptedPR = true + break + } + } + if !refsAcceptedPR { + continue + } + + orphanKEs = append(orphanKEs, orphanKE) + } + sort.Slice(orphanKEs, func(i, j int) bool { + return orphanKEs[i].Run < orphanKEs[j].Run + }) + + for _, orphanKE := range orphanKEs { + orphanKEHash := orphanKE.Hash() + _, err := p.acceptKE(orphanKE, &orphanKEHash, &orphanKE.Identity) + if err != nil { + log.Debugf("orphan KE could not be accepted: %v", err) + continue + } + + kes = append(kes, orphanKE) + delete(p.orphansByID[*id], orphanKEHash) + delete(p.orphans, orphanKEHash) + + acceptedMessages = append(acceptedMessages, orphanKE) + } + if len(p.orphansByID[*id]) == 0 { + delete(p.orphansByID, *id) + return acceptedMessages + } + } + + // For any KE that has been accepted following reconsideration after + // accepting a PR, other orphan messages may be potentially accepted + // as well. + for _, ke := range kes { + ses := p.sessions[ke.SessionID] + if ses == nil { + log.Errorf("No session %x exists for accepted KE %s", + ke.SessionID[:], ke.Hash()) + continue + } + + var acceptedOrphans []mixing.Message + for orphanHash, orphan := range p.orphansByID[*id] { + if !bytes.Equal(orphan.Sid(), ke.SessionID[:]) { + continue + } + if orphan.GetRun() != ke.Run { + continue + } + + var msgtype msgtype + switch orphan.(type) { + case *wire.MsgMixCiphertexts: + msgtype = msgtypeCT + case *wire.MsgMixSlotReserve: + msgtype = msgtypeSR + case *wire.MsgMixDCNet: + msgtype = msgtypeDC + case *wire.MsgMixConfirm: + msgtype = msgtypeCM + case *wire.MsgMixFactoredPoly: + msgtype = msgtypeFP + case *wire.MsgMixSecrets: + msgtype = msgtypeRS + default: + log.Errorf("Unknown orphan message %T %s", orphan, orphan.Hash()) + continue + } + + err := p.acceptEntry(orphan, msgtype, &orphanHash, id, ses) + if err != nil { + log.Debugf("Orphan %v by identity %x could not be "+ + "processed after accepting KE %v", + orphanHash, id[:], ke.Hash()) + continue + } + + acceptedOrphans = append(acceptedOrphans, orphan) + acceptedMessages = append(acceptedMessages, orphan) + } + for _, orphan := range acceptedOrphans { + orphanHash := orphan.Hash() + delete(p.orphansByID[*id], orphanHash) + delete(p.orphans, orphanHash) + } + if len(p.orphansByID[*id]) == 0 { + delete(p.orphansByID, *id) + return acceptedMessages + } + } + + return acceptedMessages +} + +// Check that UTXOs exist, have confirmations, sum of UTXO values matches the +// input value, and proof of ownership is valid. +func (p *Pool) checkUTXOs(pr *wire.MsgMixPairReq, curHeight int64) error { + var totalValue int64 + + for i := range pr.UTXOs { + utxo := &pr.UTXOs[i] + entry, err := p.utxoFetcher.FetchUtxoEntry(utxo.OutPoint) + if err != nil { + return err + } + if entry == nil || entry.IsSpent() { + return fmt.Errorf("output %v is not unspent", + &utxo.OutPoint) + } + height := entry.BlockHeight() + if !confirmed(minconf, height, curHeight) { + return fmt.Errorf("output %v is unconfirmed", + &utxo.OutPoint) + } + if entry.ScriptVersion() != 0 { + return fmt.Errorf("output %v does not use script version 0", + &utxo.OutPoint) + } + + // Check proof of key ownership and ability to sign coinjoin + // inputs. + var extractPubKeyHash160 func([]byte) []byte + switch { + case utxo.Opcode == 0: + extractPubKeyHash160 = stdscript.ExtractPubKeyHashV0 + case utxo.Opcode == txscript.OP_SSGEN: + extractPubKeyHash160 = stdscript.ExtractStakeGenPubKeyHashV0 + case utxo.Opcode == txscript.OP_SSRTX: + extractPubKeyHash160 = stdscript.ExtractStakeRevocationPubKeyHashV0 + case utxo.Opcode == txscript.OP_TGEN: + extractPubKeyHash160 = stdscript.ExtractTreasuryGenPubKeyHashV0 + default: + return fmt.Errorf("unsupported output script for UTXO %s", &utxo.OutPoint) + } + valid := validateOwnerProofP2PKHv0(extractPubKeyHash160, + entry.PkScript(), utxo.PubKey, utxo.Signature, pr.Expires()) + if !valid { + return ruleError(ErrInvalidUTXOProof) + } + + totalValue += entry.Amount() + } + + if totalValue != pr.InputValue { + return fmt.Errorf("input value does not match sum of UTXO " + + "values") + } + + return nil +} + +func validateOwnerProofP2PKHv0(extractFunc func([]byte) []byte, pkscript, pubkey, sig []byte, expires uint32) bool { + extractedHash160 := extractFunc(pkscript) + pubkeyHash160 := dcrutil.Hash160(pubkey) + if !bytes.Equal(extractedHash160, pubkeyHash160) { + return false + } + + return utxoproof.ValidateSecp256k1P2PKH(pubkey, sig, expires) +} + +func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *idPubKey) (accepted *wire.MsgMixKeyExchange, err error) { + // Validate PR order and session ID. + err = mixing.ValidateSession(ke) + if err != nil { + return nil, ruleError(err) + } + + if ke.Pos >= uint32(len(ke.SeenPRs)) { + err := fmt.Errorf("peer position is an invalid seen PRs position") + return nil, ruleError(err) + } + + p.mtx.Lock() + defer p.mtx.Unlock() + + // Check if already accepted. + if _, ok := p.pool[*hash]; ok { + return nil, nil + } + + // While KEs are allowed to reference unknown PRs, they must at least + // reference the PR submitted by their own identity. If not, the KE + // is saved as an orphan and may be processed later. + // Of all PRs that are known, their pairing types must be compatible. + var missingOwnPR *chainhash.Hash + prs := make([]*wire.MsgMixPairReq, 0, len(ke.SeenPRs)) + var pairing []byte + for i := range ke.SeenPRs { + seenPR := &ke.SeenPRs[i] + pr, ok := p.prs[*seenPR] + if !ok { + if uint32(i) == ke.Pos { + missingOwnPR = seenPR + } + continue + } + if uint32(i) == ke.Pos && pr.Identity != ke.Identity { + err := fmt.Errorf("KE identity does not match own PR " + + "at unmixed position") + return nil, ruleError(err) + } + if pairing == nil { + var err error + pairing, err = pr.Pairing() + if err != nil { + return nil, err + } + } else { + pairing2, err := pr.Pairing() + if err != nil { + return nil, err + } + if !bytes.Equal(pairing, pairing2) { + err := fmt.Errorf("referenced PRs are incompatible") + return nil, err + } + } + } + if missingOwnPR != nil { + p.orphans[*hash] = &orphan{ + message: ke, + accepted: time.Now(), + } + orphansByID := p.orphansByID[*id] + if orphansByID == nil { + orphansByID = make(map[chainhash.Hash]mixing.Message) + p.orphansByID[*id] = orphansByID + } + orphansByID[*hash] = ke + err := &MissingOwnPRError{ + MissingPR: *missingOwnPR, + } + return nil, err + } + + sid := ke.SessionID + ses := p.sessions[sid] + + // Create a session for the first run-0 KE + if ses == nil { + if ke.Run != 0 { + err := fmt.Errorf("unknown session for run-%d KE", + ke.Run) + return nil, err + } + + expiry := ^uint32(0) + for i := range prs { + prExpiry := prs[i].Expires() + if expiry > prExpiry { + expiry = prExpiry + } + } + ses = &session{ + sid: sid, + runs: make([]runstate, 0, 4), + expiry: expiry, + bc: broadcast{ch: make(chan struct{})}, + } + p.sessions[sid] = ses + } + + err = p.acceptEntry(ke, msgtypeKE, hash, id, ses) + if err != nil { + return nil, err + } + p.latestKE[*id] = ke + return ke, nil +} + +func (p *Pool) acceptEntry(msg mixing.Message, msgtype msgtype, hash *chainhash.Hash, + id *[33]byte, ses *session) error { + + run := msg.GetRun() + if run > uint32(len(ses.runs)) { + return fmt.Errorf("message skips runs") + } + + var rs *runstate + if msgtype == msgtypeKE && run == uint32(len(ses.runs)) { + // Add a runstate for the next run. + ses.runs = append(ses.runs, runstate{ + prs: msg.PrevMsgs(), + hashes: make(map[chainhash.Hash]struct{}), + }) + rs = &ses.runs[len(ses.runs)-1] + } else { + // Add to existing runstate + rs = &ses.runs[run] + } + + rs.hashes[*hash] = struct{}{} + e := entry{ + hash: *hash, + sid: ses.sid, + recvTime: time.Now(), + msg: msg, + msgtype: msgtype, + run: run, + } + p.pool[*hash] = e + p.messagesByIdentity[*id] = append(p.messagesByIdentity[*id], *hash) + + if cm, ok := msg.(*wire.MsgMixConfirm); ok { + p.sessionsByTxHash[cm.Mix.TxHash()] = ses + } + + rs.incrementCountFor(msgtype) + ses.bc.signal() + + return nil +} + +func confirmed(minConf, txHeight, curHeight int64) bool { + return confirms(txHeight, curHeight) >= minConf +} + +func confirms(txHeight, curHeight int64) int64 { + switch { + case txHeight == -1, txHeight > curHeight: + return 0 + default: + return curHeight - txHeight + 1 + } +} + +// isDustAmount determines whether a transaction output value and script length would +// cause the output to be considered dust. Transactions with dust outputs are +// not standard and are rejected by mempools with default policies. +func isDustAmount(amount int64, scriptSize int, relayFeePerKb int64) bool { + // Calculate the total (estimated) cost to the network. This is + // calculated using the serialize size of the output plus the serial + // size of a transaction input which redeems it. The output is assumed + // to be compressed P2PKH as this is the most common script type. Use + // the average size of a compressed P2PKH redeem input (165) rather than + // the largest possible (txsizes.RedeemP2PKHInputSize). + totalSize := 8 + 2 + wire.VarIntSerializeSize(uint64(scriptSize)) + + scriptSize + 165 + + // Dust is defined as an output value where the total cost to the network + // (output size + input size) is greater than 1/3 of the relay fee. + return amount*1000/(3*int64(totalSize)) < relayFeePerKb +} + +func checkFee(pr *wire.MsgMixPairReq, feeRate int64) error { + fee := pr.InputValue - int64(pr.MessageCount)*pr.MixAmount + if pr.Change != nil { + fee -= pr.Change.Value + } + + estimatedSize := estimateP2PKHv0SerializeSize(len(pr.UTXOs), + int(pr.MessageCount), pr.Change != nil) + requiredFee := feeForSerializeSize(feeRate, estimatedSize) + if fee < requiredFee { + return fmt.Errorf("not enough input value, or too low fee") + } + + return nil +} + +func feeForSerializeSize(relayFeePerKb int64, txSerializeSize int) int64 { + fee := relayFeePerKb * int64(txSerializeSize) / 1000 + + if fee == 0 && relayFeePerKb > 0 { + fee = relayFeePerKb + } + + const maxAmount = 21e6 * 1e8 + if fee < 0 || fee > maxAmount { + fee = maxAmount + } + + return fee +} + +const ( + redeemP2PKHv0SigScriptSize = 1 + 73 + 1 + 33 + p2pkhv0PkScriptSize = 1 + 1 + 1 + 20 + 1 + 1 +) + +func estimateP2PKHv0SerializeSize(inputs, outputs int, hasChange bool) int { + // Sum the estimated sizes of the inputs and outputs. + txInsSize := inputs * estimateInputSize(redeemP2PKHv0SigScriptSize) + txOutsSize := outputs * estimateOutputSize(p2pkhv0PkScriptSize) + + changeSize := 0 + if hasChange { + changeSize = estimateOutputSize(p2pkhv0PkScriptSize) + outputs++ + } + + // 12 additional bytes are for version, locktime and expiry. + return 12 + (2 * wire.VarIntSerializeSize(uint64(inputs))) + + wire.VarIntSerializeSize(uint64(outputs)) + + txInsSize + txOutsSize + changeSize +} + +// estimateInputSize returns the worst case serialize size estimate for a tx input +func estimateInputSize(scriptSize int) int { + return 32 + // previous tx + 4 + // output index + 1 + // tree + 8 + // amount + 4 + // block height + 4 + // block index + wire.VarIntSerializeSize(uint64(scriptSize)) + // size of script + scriptSize + // script itself + 4 // sequence +} + +// estimateOutputSize returns the worst case serialize size estimate for a tx output +func estimateOutputSize(scriptSize int) int { + return 8 + // previous tx + 2 + // version + wire.VarIntSerializeSize(uint64(scriptSize)) + // size of script + scriptSize // script itself +} diff --git a/mixing/mixpool/mixpool_test.go b/mixing/mixpool/mixpool_test.go new file mode 100644 index 0000000000..8c8aba3900 --- /dev/null +++ b/mixing/mixpool/mixpool_test.go @@ -0,0 +1,408 @@ +// Copyright (c) 2023-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + cryptorand "crypto/rand" + "encoding/hex" + "errors" + "flag" + "fmt" + "io" + "math/big" + "os" + "testing" + + "decred.org/cspp/v2/solverrpc" + "github.com/davecgh/go-spew/spew" + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" + "github.com/decred/dcrd/dcrutil/v4" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/internal/chacha20prng" + "github.com/decred/dcrd/mixing/utxoproof" + "github.com/decred/dcrd/txscript/v4" + "github.com/decred/dcrd/wire" +) + +var params = chaincfg.SimNetParams() + +var seed [32]byte + +func testPRNG(t *testing.T) *chacha20prng.Reader { + t.Logf("PRNG seed: %x\n", seed) + return chacha20prng.New(seed[:], 0) +} + +var utxoStore struct { + byName map[string]*mockUTXO + byOutpoint map[wire.OutPoint]UtxoEntry +} + +func TestMain(m *testing.M) { + seedFlag := flag.String("seed", "", "use deterministic PRNG seed (32 bytes, hex)") + flag.Parse() + if *seedFlag != "" { + b, err := hex.DecodeString(*seedFlag) + if err != nil { + fmt.Fprintln(os.Stderr, "invalid -seed:", err) + os.Exit(1) + } + if len(b) != 32 { + fmt.Fprintln(os.Stderr, "invalid -seed: must be 32 bytes") + os.Exit(1) + } + copy(seed[:], b) + } else { + cryptorand.Read(seed[:]) + } + + rand := chacha20prng.New(seed[:], 0) + + utxoStore.byName = makeMockUTXOs(rand) + utxoStore.byOutpoint = make(map[wire.OutPoint]UtxoEntry) + for _, m := range utxoStore.byName { + utxoStore.byOutpoint[m.outpoint] = m + } + + os.Exit(m.Run()) +} + +func generateSecp256k1(rand io.Reader) (*secp256k1.PublicKey, *secp256k1.PrivateKey, error) { + if rand == nil { + rand = cryptorand.Reader + } + + privateKey, err := secp256k1.GeneratePrivateKeyFromRand(rand) + if err != nil { + return nil, nil, err + } + + publicKey := privateKey.PubKey() + + return publicKey, privateKey, nil +} + +type mockUTXO struct { + outpoint wire.OutPoint + tx *wire.MsgTx + output *wire.TxOut + blockHeight int64 + pubkey []byte + privkey *secp256k1.PrivateKey +} + +func (m *mockUTXO) IsSpent() bool { return false } +func (m *mockUTXO) PkScript() []byte { return m.output.PkScript } +func (m *mockUTXO) ScriptVersion() uint16 { return 0 } +func (m *mockUTXO) BlockHeight() int64 { return m.blockHeight } +func (m *mockUTXO) Amount() int64 { return m.output.Value } + +func makeMockUTXO(rand io.Reader, value int64) *mockUTXO { + tx := wire.NewMsgTx() + + pub, priv, err := generateSecp256k1(rand) + if err != nil { + panic(err) + } + pubSerialized := pub.SerializeCompressed() + + p2pkhScript := []byte{ + 0: txscript.OP_DUP, + 1: txscript.OP_HASH160, + 2: txscript.OP_DATA_20, + 23: txscript.OP_EQUALVERIFY, + 24: txscript.OP_CHECKSIG, + } + hash160 := dcrutil.Hash160(pubSerialized) + copy(p2pkhScript[3:23], hash160) + + output := wire.NewTxOut(value, p2pkhScript) + tx.AddTxOut(output) + + return &mockUTXO{ + outpoint: wire.OutPoint{ + Hash: tx.TxHash(), + Index: 0, + Tree: 0, + }, + tx: tx, + output: output, + pubkey: pubSerialized, + privkey: priv, + } +} + +func (m *mockUTXO) mixprutxo(expires uint32) wire.MixPairReqUTXO { + k := utxoproof.Secp256k1KeyPair{ + Pub: m.pubkey, + Priv: m.privkey, + } + sig, err := k.SignUtxoProof(expires) + if err != nil { + panic(err) + } + + return wire.MixPairReqUTXO{ + OutPoint: m.outpoint, + PubKey: m.pubkey, + Signature: sig, + } +} + +func makeMockUTXOs(rand io.Reader) map[string]*mockUTXO { + return map[string]*mockUTXO{ + "A": makeMockUTXO(rand, 20e8), + } +} + +type fakechain struct { + hash chainhash.Hash + height int64 +} + +func (c *fakechain) ChainParams() *chaincfg.Params { + return params +} + +func (c *fakechain) CurrentTip() (chainhash.Hash, int64) { + return c.hash, c.height +} + +func (c *fakechain) FetchUtxoEntry(op wire.OutPoint) (UtxoEntry, error) { + entry := utxoStore.byOutpoint[op] + if entry == nil { + return nil, errors.New("no utxo entry") + } + return entry, nil +} + +func TestAccept(t *testing.T) { + t.Parallel() + testRand := testPRNG(t) + + c := new(fakechain) + c.height = 1000 + p := NewPool(c) + + identityPub, identityPriv, err := generateSecp256k1(testRand) + if err != nil { + t.Fatal(err) + } + identity := *(*[33]byte)(identityPub.SerializeCompressed()) + + h := blake256.New() + + var ( + expires uint32 = 1010 + mixAmount int64 = 10e8 + scriptClass = mixing.ScriptClassP2PKHv0 + txVersion uint16 = wire.TxVersion + lockTime uint32 = 0 + messageCount uint32 = 1 + inputValue int64 = 20e8 + utxos []wire.MixPairReqUTXO + change *wire.TxOut + flags byte = 0 + pairingFlags byte = 0 + ) + utxos = []wire.MixPairReqUTXO{ + utxoStore.byName["A"].mixprutxo(expires), + } + pr, err := wire.NewMsgMixPairReq(identity, expires, mixAmount, + string(scriptClass), txVersion, lockTime, messageCount, + inputValue, utxos, change, flags, pairingFlags) + if err != nil { + t.Fatal(err) + } + pr.WriteHash(h) + err = mixing.SignMessage(pr, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(pr) + if err != nil { + t.Fatal(err) + } + + prngSeed := testRand.Next(32) + runPRNG := chacha20prng.New(prngSeed, 0) + kx, err := mixing.NewKX(runPRNG) + if err != nil { + t.Fatal(err) + } + + // Generate unpadded SR and DC messages. + var msize uint32 = 20 + var mcount uint32 = 2 + srMsg := make([]*big.Int, mcount) + for i := range srMsg { + srMsg[i], err = cryptorand.Int(testRand, mixing.F) + if err != nil { + t.Fatal(err) + } + } + dcMsg := make([][]byte, mcount) + for i := range dcMsg { + dcMsg[i] = testRand.Next(int(msize)) + } + t.Logf("SR messages %+x\n", srMsg) + t.Logf("DC messages %+x\n", dcMsg) + + var ( + seenPRs = []chainhash.Hash{pr.Hash()} + epoch uint64 = 0 + sid [32]byte = mixing.SortPRsForSession([]*wire.MsgMixPairReq{pr}, epoch) + run uint32 = 0 + pos uint32 = 0 + ecdh [33]byte = *(*[33]byte)(kx.ECDHPublicKey.SerializeCompressed()) + pqpk [1218]byte = *kx.PQPublicKey + commitment [32]byte // XXX: hash of RS message + ) + ke := wire.NewMsgMixKeyExchange(identity, sid, epoch, run, pos, ecdh, + pqpk, commitment, seenPRs) + ke.WriteHash(h) + err = mixing.SignMessage(ke, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(ke) + if err != nil { + t.Fatal(err) + } + + pqPubkeys := []*mixing.PQPublicKey{kx.PQPublicKey} + ciphertexts, err := kx.Encapsulate(runPRNG, pqPubkeys, 0) + if err != nil { + t.Fatal(err) + } + + seenKEs := []chainhash.Hash{ke.Hash()} + ct := wire.NewMsgMixCiphertexts(identity, sid, run, ciphertexts, seenKEs) + ct.WriteHash(h) + err = mixing.SignMessage(ct, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(ct) + if err != nil { + t.Fatal(err) + } + + mcounts := []uint32{mcount} + revealedKeys := &mixing.RevealedKeys{ + ECDHPublicKeys: []*secp256k1.PublicKey{kx.ECDHPublicKey}, + Ciphertexts: ciphertexts, + MyIndex: 0, + } + secrets, err := kx.SharedSecrets(revealedKeys, sid[:], run, mcounts) + if err != nil { + t.Fatal(err) + } + + // Pad SR messages + srmix := make([][]*big.Int, mcount) + myStart := uint32(0) + for i := uint32(0); i < mcount; i++ { + pads := mixing.SRMixPads(secrets.SRSecrets[i], myStart+i) + padded := mixing.SRMix(srMsg[i], pads) + srmix[i] = make([]*big.Int, len(padded)) + copy(srmix[i], padded) + } + srmixBytes := make([][][]byte, len(srmix)) + for i := range srmix { + srmixBytes[i] = make([][]byte, len(srmix[i])) + for j := range srmix[i] { + srmixBytes[i][j] = srmix[i][j].Bytes() + } + } + t.Logf("SR mix %+x\n", srmix) + + seenCTs := []chainhash.Hash{ct.Hash()} + sr := wire.NewMsgMixSlotReserve(identity, sid, run, srmixBytes, seenCTs) + sr.WriteHash(h) + err = mixing.SignMessage(sr, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(sr) + if err != nil { + t.Fatal(err) + } + + vs := srmix + powerSums := mixing.AddVectors(vs...) + coeffs := mixing.Coefficients(powerSums) + if err := solverrpc.StartSolver(); err != nil { + t.Skipf("Cannot continue test; unable to start solver: %v", err) + } + roots, err := solverrpc.Roots(coeffs, mixing.F) + if err != nil { + t.Fatal(err) + } + t.Logf("solved roots %+x\n", roots) + + // Pad DC messages + dcmix := make([]wire.MixVect, mcount) + var slots []int + for i := 0; i < int(mcount); i++ { + slots = append(slots, i) + } + for i, slot := range slots { + my := myStart + uint32(i) + pads := mixing.DCMixPads(secrets.DCSecrets[i], my) + dcmix[i] = wire.MixVect(mixing.DCMix(pads, dcMsg[i], uint32(slot))) + } + + seenSRs := []chainhash.Hash{sr.Hash()} + dc := wire.NewMsgMixDCNet(identity, sid, run, dcmix, seenSRs) + dc.WriteHash(h) + err = mixing.SignMessage(dc, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(dc) + if err != nil { + t.Fatal(err) + } + + dcVecs := make([]mixing.Vec, 0, len(dcmix)) + for _, vec := range dcmix { + dcVecs = append(dcVecs, mixing.Vec(vec)) + } + res := mixing.XorVectors(dcVecs) + t.Logf("recovered message set %v", res.String()) + + tx := wire.NewMsgTx() + for i := range res { + hash160 := res[i][:] + pkscript := []byte{ + 0: txscript.OP_DUP, + 1: txscript.OP_HASH160, + 2: txscript.OP_DATA_20, + 23: txscript.OP_EQUALVERIFY, + 24: txscript.OP_CHECKSIG, + } + copy(pkscript[3:23], hash160) + tx.AddTxOut(wire.NewTxOut(mixAmount, pkscript)) + } + t.Logf("mixed tx hash %v", tx.TxHash()) + + seenDCs := []chainhash.Hash{dc.Hash()} + cm := wire.NewMsgMixConfirm(identity, sid, run, tx, seenDCs) + cm.WriteHash(h) + err = mixing.SignMessage(cm, identityPriv) + if err != nil { + t.Fatal(err) + } + _, err = p.AcceptMessage(cm) + if err != nil { + t.Fatal(err) + } + + t.Logf("%s", spew.Sdump(tx)) +} diff --git a/mixing/mixpool/orphans_test.go b/mixing/mixpool/orphans_test.go new file mode 100644 index 0000000000..4374349bd8 --- /dev/null +++ b/mixing/mixpool/orphans_test.go @@ -0,0 +1,262 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package mixpool + +import ( + "errors" + "reflect" + "testing" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/wire" +) + +var ( + testStartingHeight uint32 = 100 + testStartingBlock = chainhash.Hash{100} +) + +var testnetParams = chaincfg.TestNet3Params() + +type testBlockchain struct{} + +func newTestBlockchain() *testBlockchain { + return &testBlockchain{} +} + +func (b *testBlockchain) CurrentTip() (chainhash.Hash, int64) { + return testStartingBlock, int64(testStartingHeight) +} + +func (b *testBlockchain) ChainParams() *chaincfg.Params { + return testnetParams +} + +// Intentionally create orphans and test their acceptance behavior when PRs +// and KEs are accepted. +func TestOrphans(t *testing.T) { + pub, priv, err := generateSecp256k1(nil) + if err != nil { + t.Fatal(err) + } + id := *(*[33]byte)(pub.SerializeCompressed()) + + h := blake256.New() + + pr := &wire.MsgMixPairReq{ + Identity: id, + UTXOs: []wire.MixPairReqUTXO{ + {}, + }, + MessageCount: 1, + Expiry: testStartingHeight + 10, + ScriptClass: string(mixing.ScriptClassP2PKHv0), + InputValue: 1 << 18, + } + err = mixing.SignMessage(pr, priv) + if err != nil { + t.Fatal(err) + } + pr.WriteHash(h) + + prs := []*wire.MsgMixPairReq{pr} + epoch := uint64(1704067200) + sid := mixing.SortPRsForSession(prs, epoch) + ke1 := &wire.MsgMixKeyExchange{ + Identity: id, + SeenPRs: []chainhash.Hash{ + pr.Hash(), + }, + SessionID: sid, + Epoch: epoch, + Run: 0, + } + err = mixing.SignMessage(ke1, priv) + if err != nil { + t.Fatal(err) + } + ke1.WriteHash(h) + + ke2 := &wire.MsgMixKeyExchange{ + Identity: id, + SeenPRs: []chainhash.Hash{ + pr.Hash(), + }, + SessionID: sid, + Epoch: epoch, + Run: 1, + } + err = mixing.SignMessage(ke2, priv) + if err != nil { + t.Fatal(err) + } + ke2.WriteHash(h) + + fp1 := &wire.MsgMixFactoredPoly{ + Identity: id, + SessionID: sid, + Run: 0, + } + err = mixing.SignMessage(fp1, priv) + if err != nil { + t.Fatal(err) + } + fp1.WriteHash(h) + + fp2 := &wire.MsgMixFactoredPoly{ + Identity: id, + SessionID: sid, + Run: 1, + } + err = mixing.SignMessage(fp2, priv) + if err != nil { + t.Fatal(err) + } + fp2.WriteHash(h) + + t.Logf("pr %s", pr.Hash()) + t.Logf("ke1 %s", ke1.Hash()) + t.Logf("ke2 %s", ke2.Hash()) + t.Logf("fp1 %s", fp1.Hash()) + t.Logf("fp2 %s", fp2.Hash()) + + // Create a pair request, several KEs, and later messages belong to + // the session and run increment for each KE. Test different + // combinations of acceptance order to test orphan processing of + // various message types. + type accept struct { + desc string + message mixing.Message + errors bool + errAs interface{} + accepted []mixing.Message + } + tests := [][]accept{ + // Accept KE, then PR, then FP + 0: {{ + desc: "accept KE before PR", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR after KE; both should now process", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1}, + }, { + desc: "accept future message in accepted KE session/run", + message: fp1, + errors: false, // maybe later. + accepted: []mixing.Message{fp1}, + }}, + + // Accept FP, then KE, then PR + 1: {{ + desc: "accept FP first", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept KE", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR; all should now be processed", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1, fp1}, + }}, + + // Accept PR, then FP1, then FP2, then KE1, then KE2. + 2: {{ + desc: "accept PR first", + message: pr, + errors: false, + accepted: []mixing.Message{pr}, + }, { + desc: "accept FP1", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept FP2", + message: fp2, + errors: false, + accepted: nil, + }, { + desc: "accept KE1", + message: ke1, + errors: false, + accepted: []mixing.Message{ke1, fp1}, + }, { + desc: "accept KE2", + message: ke2, + errors: false, + accepted: []mixing.Message{ke2, fp2}, + }}, + + 3: {{ + desc: "accept FP1", + message: fp1, + errors: false, + accepted: nil, + }, { + desc: "accept FP2", + message: fp2, + errors: false, + accepted: nil, + }, { + desc: "accept KE1", + message: ke1, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept KE2", + message: ke2, + errors: true, + errAs: new(MissingOwnPRError), + accepted: nil, + }, { + desc: "accept PR last", + message: pr, + errors: false, + accepted: []mixing.Message{pr, ke1, ke2, fp1, fp2}, + }}, + } + + for i, accepts := range tests { + t.Logf("test %d", i) + mp := NewPool(newTestBlockchain()) + + for j, a := range accepts { + accepted, err := mp.AcceptMessage(a.message) + if err != nil != a.errors { + t.Errorf("test %d call %d %q: unexpected error: %v", i, j, a.desc, err) + } + if a.errors && !errors.As(err, &a.errAs) { + t.Errorf("test %d call %d %q: unexpected error: %v", i, j, a.desc, err) + } + if len(accepted) != len(a.accepted) { + t.Logf("orphans: %v", mp.orphans) + t.Logf("orphansByID: %v", mp.orphansByID) + t.Logf("pr: %v", pr) + t.Logf("ke2: %v", ke2) + t.Logf("fp2: %v", fp2) + t.Errorf("test %d call %d %q: accepted lengths differ %d != %d", i, j, a.desc, + len(accepted), len(a.accepted)) + } + if !reflect.DeepEqual(accepted, a.accepted) { + t.Errorf("test %d call %d %q: accepted messages differs: %#v", i, j, a.desc, accepted) + } + } + } +} diff --git a/mixing/utxoproof/utxoproof.go b/mixing/utxoproof/utxoproof.go new file mode 100644 index 0000000000..e0a33ec38e --- /dev/null +++ b/mixing/utxoproof/utxoproof.go @@ -0,0 +1,90 @@ +// Copyright (c) 2023 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package utxoproof + +import ( + "encoding/binary" + + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" + "github.com/decred/dcrd/dcrec/secp256k1/v4/schnorr" +) + +// Tags and schemes describing the message being signed. +// +// These strings must not contain the comma, which is reserved as a separator +// character. +const ( + tag = "mixpr-utxoproof" + + // schemes + secp256k1P2PKH = "P2PKH(EC-Schnorr-DCRv0)" +) + +var sep = []byte{','} + +// The signature hash is created from the serialization of: +// tag , scheme , expiry pubkey +// No separator is written after expiry; it is fixed length. + +// Secp256k1KeyPair provides access to the serialized public key and parsed +// private key of a secp256k1 key pair. +type Secp256k1KeyPair struct { + Pub []byte + Priv *secp256k1.PrivateKey +} + +// SignUtxoProof returns the UTXO proof of ownership over an output controlled +// by the keypair. The UTXO proof is only valid for the provided expiry +// height to prevent its inclusion in other PR messages signed by an unrelated +// identity. +func (k *Secp256k1KeyPair) SignUtxoProof(expires uint32) ([]byte, error) { + const scheme = secp256k1P2PKH + + h := blake256.New() + h.Write([]byte(tag)) + h.Write(sep) + h.Write([]byte(scheme)) + h.Write(sep) + expiresBytes := binary.BigEndian.AppendUint32(make([]byte, 0, 4), expires) + h.Write(expiresBytes) + h.Write(k.Pub) + hash := h.Sum(nil) + + sig, err := schnorr.Sign(k.Priv, hash) + if err != nil { + return nil, err + } + + return sig.Serialize(), nil +} + +// ValidateSecp256k1P2PKH validates the UTXO proof of an output controlled by +// a secp256k1 keypair for the given expiry height. Returns true only if the +// proof is valid. +func ValidateSecp256k1P2PKH(pubkey, proof []byte, expires uint32) bool { + const scheme = secp256k1P2PKH + + pubkeyParsed, err := secp256k1.ParsePubKey(pubkey) + if err != nil { + return false + } + proofParsed, err := schnorr.ParseSignature(proof) + if err != nil { + return false + } + + h := blake256.New() + h.Write([]byte(tag)) + h.Write(sep) + h.Write([]byte(scheme)) + h.Write(sep) + expiresBytes := binary.BigEndian.AppendUint32(make([]byte, 0, 4), expires) + h.Write(expiresBytes) + h.Write(pubkey) + hash := h.Sum(nil) + + return proofParsed.Verify(hash, pubkeyParsed) +}