diff --git a/.gitignore b/.gitignore index 47b35066cf..f2d27c0f8d 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ client/asset/btc/electrum/example/wallet/wallet client/asset/eth/cmd/getgas/getgas client/asset/eth/cmd/deploy/deploy client/cmd/dexc-desktop/pkg/installers +server/noderelay/cmd/sourcenode/sourcenode diff --git a/client/comms/wsconn.go b/client/comms/wsconn.go index c8ea235df5..b3e3f495c7 100644 --- a/client/comms/wsconn.go +++ b/client/comms/wsconn.go @@ -92,6 +92,7 @@ type WsConn interface { NextID() uint64 IsDown() bool Send(msg *msgjson.Message) error + SendRaw(b []byte) error Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error Connect(ctx context.Context) (*sync.WaitGroup, error) @@ -134,6 +135,10 @@ type WsCfg struct { // NetDialContext specifies an optional dialer context to use. NetDialContext func(context.Context, string, string) (net.Conn, error) + + // RawHandler overrides the msgjson parsing and forwards all messages to + // the provided function. + RawHandler func([]byte) } // wsConn represents a client websocket connection. @@ -159,6 +164,8 @@ type wsConn struct { reconnectCh chan struct{} // trigger for immediate reconnect } +var _ WsConn = (*wsConn)(nil) + // NewWsConn creates a client websocket connection. func NewWsConn(cfg *WsCfg) (WsConn, error) { if cfg.PingWait < 0 { @@ -223,6 +230,7 @@ func (conn *wsConn) connect(ctx context.Context) error { } else { dialer.Proxy = http.ProxyFromEnvironment } + ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, nil) if err != nil { if isErrorInvalidCert(err) { @@ -278,12 +286,66 @@ func (conn *wsConn) connect(ctx context.Context) error { conn.wg.Add(1) go func() { defer conn.wg.Done() - conn.read(ctx) + if conn.cfg.RawHandler != nil { + conn.readRaw(ctx) + } else { + conn.read(ctx) + } + }() return nil } +func (conn *wsConn) SetReadLimit(limit int64) { + conn.wsMtx.Lock() + ws := conn.ws + conn.wsMtx.Unlock() + if ws != nil { + ws.SetReadLimit(limit) + } +} + +func (conn *wsConn) handleReadError(err error) { + reconnect := func() { + conn.setConnectionStatus(Disconnected) + conn.reconnectCh <- struct{}{} + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL) + reconnect() + return + } + // TODO: Now that wsConn goroutines have contexts that are canceled + // on shutdown, we do not have to infer the source and severity of + // the error; just reconnect in ALL other cases, and remove the + // following legacy checks. + + // Expected close errors (1000 and 1001) ... but if the server + // closes we still want to reconnect. (???) + if websocket.IsCloseError(err, websocket.CloseGoingAway, + websocket.CloseNormalClosure) || + strings.Contains(err.Error(), "websocket: close sent") { + reconnect() + return + } + + var opErr *net.OpError + if errors.As(err, &opErr) && opErr.Op == "read" { + if strings.Contains(opErr.Err.Error(), "use of closed network connection") { + conn.log.Errorf("read quitting: %v", err) + reconnect() + return + } + } + + // Log all other errors and trigger a reconnection. + conn.log.Errorf("read error (%v), attempting reconnection", err) + reconnect() +} + func (conn *wsConn) close() { // Attempt to send a close message in case the connection is still live. msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bye") @@ -293,14 +355,30 @@ func (conn *wsConn) close() { conn.ws.Close() } +func (conn *wsConn) readRaw(ctx context.Context) { + for { + if ctx.Err() != nil { + return + } + + // Lock since conn.ws may be set by connect. + conn.wsMtx.Lock() + ws := conn.ws + conn.wsMtx.Unlock() + + // Block until a message is received or an error occurs. + _, msgBytes, err := ws.ReadMessage() + if err != nil { + conn.handleReadError(err) + return + } + conn.cfg.RawHandler(msgBytes) + } +} + // read fetches and parses incoming messages for processing. This should be // run as a goroutine. Increment the wg before calling read. func (conn *wsConn) read(ctx context.Context) { - reconnect := func() { - conn.setConnectionStatus(Disconnected) - conn.reconnectCh <- struct{}{} - } - for { msg := new(msgjson.Message) @@ -317,49 +395,13 @@ func (conn *wsConn) read(ctx context.Context) { return } if err != nil { - // Read timeout should flag the connection as down asap. - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL) - reconnect() - return - } - var mErr *json.UnmarshalTypeError if errors.As(err, &mErr) { // JSON decode errors are not fatal, log and proceed. conn.log.Errorf("json decode error: %v", mErr) continue } - - // TODO: Now that wsConn goroutines have contexts that are canceled - // on shutdown, we do not have to infer the source and severity of - // the error; just reconnect in ALL other cases, and remove the - // following legacy checks. - - // Expected close errors (1000 and 1001) ... but if the server - // closes we still want to reconnect. (???) - if websocket.IsCloseError(err, websocket.CloseGoingAway, - websocket.CloseNormalClosure) || - strings.Contains(err.Error(), "websocket: close sent") { - reconnect() - return - } - - var opErr *net.OpError - if errors.As(err, &opErr) && opErr.Op == "read" { - if strings.Contains(opErr.Err.Error(), - "use of closed network connection") { - conn.log.Errorf("read quitting: %v", err) - reconnect() - return - } - } - - // Log all other errors and trigger a reconnection. - conn.log.Errorf("read error (%v), attempting reconnection", err) - reconnect() - // Successful reconnect via connect() will start read() again. + conn.handleReadError(err) return } @@ -367,7 +409,8 @@ func (conn *wsConn) read(ctx context.Context) { if msg.Type == msgjson.Response { handler := conn.respHandler(msg.ID) if handler == nil { - conn.log.Errorf("unhandled response with error msg: %v", handleUnknownResponse(msg)) + b, _ := json.Marshal(msg) + conn.log.Errorf("No handler found for response: %v", string(b)) continue } // Run handlers in a goroutine so that other messages can be @@ -496,12 +539,6 @@ func (conn *wsConn) Stop() { conn.cancel() } -func (conn *wsConn) SendRaw(b []byte) error { - conn.wsMtx.Lock() - defer conn.wsMtx.Unlock() - return conn.ws.WriteMessage(websocket.TextMessage, b) -} - // Send pushes outgoing messages over the websocket connection. Sending of the // message is synchronous, so a nil error guarantees that the message was // successfully sent. A non-nil error may indicate that the connection is known @@ -519,10 +556,14 @@ func (conn *wsConn) Send(msg *msgjson.Message) error { conn.log.Errorf("Failed to marshal message: %v", err) return err } + return conn.SendRaw(b) +} +// SendRaw sends a raw byte string over the websocket connection. +func (conn *wsConn) SendRaw(b []byte) error { conn.wsMtx.Lock() defer conn.wsMtx.Unlock() - err = conn.ws.SetWriteDeadline(time.Now().Add(writeWait)) + err := conn.ws.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { conn.log.Errorf("Send: failed to set write deadline: %v", err) return err @@ -636,13 +677,3 @@ func (conn *wsConn) respHandler(id uint64) *responseHandler { func (conn *wsConn) MessageSource() <-chan *msgjson.Message { return conn.readCh } - -// handleUnknownResponse extracts the error message sent for a response without -// a handler. -func handleUnknownResponse(msg *msgjson.Message) error { - resp, err := msg.Response() - if err != nil { - return err - } - return resp.Error -} diff --git a/client/core/core_test.go b/client/core/core_test.go index 5cf82553fd..087616ae5c 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -319,6 +319,10 @@ func (conn *TWebsocket) Send(msg *msgjson.Message) error { return conn.sendErr } + +func (conn *TWebsocket) SendRaw([]byte) error { + return conn.sendErr +} func (conn *TWebsocket) Request(msg *msgjson.Message, f msgFunc) error { return conn.RequestWithTimeout(msg, f, 0, func() {}) } diff --git a/dex/testing/dcrdex/harness.sh b/dex/testing/dcrdex/harness.sh index d3107b1abe..4045e87537 100755 --- a/dex/testing/dcrdex/harness.sh +++ b/dex/testing/dcrdex/harness.sh @@ -247,6 +247,29 @@ EOF else echo "WARNING: Dash is not running. Configuring dcrdex markets without DASH." fi +# run with NODERELAY=1 to use a node relay for the bitcoin node. +BTC_NODERELAY_ID="" +DCR_NODERELAY_ID="" +BTC_CONFIG_PATH="${TEST_ROOT}/btc/alpha/alpha.conf" +DCR_CONFIG_PATH="${TEST_ROOT}/dcr/alpha/dcrd.conf" +if [[ -n ${NODERELAY} ]]; then + BTC_NODERELAY_ID="btc_a21afba3" + DCR_NODERELAY_ID="dcr_a21afba3" + RELAY_CONF_PATH="${TEST_ROOT}/btc/alpha/alpha_noderelay.conf" + if [ ! -f "${RELAY_CONF_PATH}" ]; then + cp "${BTC_CONFIG_PATH}" "${RELAY_CONF_PATH}" + echo "rpcbind=noderelay:${BTC_NODERELAY_ID}" >> "${RELAY_CONF_PATH}" + fi + BTC_CONFIG_PATH="${RELAY_CONF_PATH}" + + RELAY_CONF_PATH="${TEST_ROOT}/dcr/alpha/dcrd_noderelay.conf" + if [ ! -f "${RELAY_CONF_PATH}" ]; then + cp "${DCR_CONFIG_PATH}" "${RELAY_CONF_PATH}" + echo "rpclisten=noderelay:${DCR_NODERELAY_ID}" >> "${RELAY_CONF_PATH}" + fi + DCR_CONFIG_PATH="${RELAY_CONF_PATH}" +fi + cat << EOF >> "./markets.json" } ], @@ -256,24 +279,26 @@ cat << EOF >> "./markets.json" "network": "simnet", "maxFeeRate": 10, "swapConf": 1, - "configPath": "${TEST_ROOT}/dcr/alpha/dcrd.conf", + "configPath": "${DCR_CONFIG_PATH}", "regConfs": 1, "regFee": 100000000, "regXPub": "spubVWKGn9TGzyo7M4b5xubB5UV4joZ5HBMNBmMyGvYEaoZMkSxVG4opckpmQ26E85iHg8KQxrSVTdex56biddqtXBerG9xMN8Dvb3eNQVFFwpE", "bondAmt": 1000000000, - "bondConfs": 1 + "bondConfs": 1, + "nodeRelayID": "${DCR_NODERELAY_ID}" }, "BTC_simnet": { "bip44symbol": "btc", "network": "simnet", "maxFeeRate": 100, "swapConf": 1, - "configPath": "${TEST_ROOT}/btc/alpha/alpha.conf", + "configPath": "${BTC_CONFIG_PATH}", "regConfs": 2, "regFee": 20000000, "regXPub": "vpub5SLqN2bLY4WeZJ9SmNJHsyzqVKreTXD4ZnPC22MugDNcjhKX5xNX9QiQWcE4SSRzVWyHWUihpKRT7hckDGNzVc69wSX2JPcfGeNiT5c2XZy", "bondAmt": 10000000, - "bondConfs": 1 + "bondConfs": 1, + "nodeRelayID": "${BTC_NODERELAY_ID}" EOF @@ -452,6 +477,7 @@ maxepochcancels=128 inittakerlotlimit=40 abstakerlotlimit=1200 httpprof=1 +noderelayaddr=127.0.0.1:17539 EOF # Set the postgres user pass if provided. @@ -508,6 +534,10 @@ export SHELL=$(which bash) cat > "${DCRDEX_DATA_DIR}/quit" < 0 { + return endpoints, nil + } return nil, err } defer file.Close() - assetName := strings.ToUpper(dex.BipIDSymbol(baseChainID)) + assetName := strings.ToUpper(dex.BipIDSymbol(cfg.AssetID)) - var endpoints []endpoint endpointsMap := make(map[string]bool) // to avoid duplicates scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -285,6 +284,7 @@ func NewEVMBackend( if len(parts) < 1 || len(parts) > 2 { return nil, fmt.Errorf(ethCfgInstructions, assetName, line) } + url := strings.TrimSpace(parts[0]) var priority uint16 if len(parts) == 2 { @@ -304,12 +304,30 @@ func NewEVMBackend( }) } if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading %s config file at %q. %v", assetName, configPath, err) + return nil, fmt.Errorf("error reading %s config file at %q. %v", assetName, cfg.ConfigPath, err) } if len(endpoints) == 0 { - return nil, fmt.Errorf("no endpoint found in the %s config file at %q", assetName, configPath) + return nil, fmt.Errorf("no endpoint found in the %s config file at %q", assetName, cfg.ConfigPath) } - log.Debugf("Parsed %d endpoints from the %s config file", len(endpoints), assetName) + + return endpoints, nil +} + +// NewEVMBackend is the exported constructor by which the DEX will import the +// Backend. +func NewEVMBackend( + cfg *asset.BackendConfig, + contractAddrs map[uint32]map[dex.Network]common.Address, + vTokens map[uint32]*VersionedToken, +) (*ETHBackend, error) { + + endpoints, err := parseEndpoints(cfg) + if err != nil { + return nil, err + } + + baseChainID, net, log := cfg.AssetID, cfg.Net, cfg.Logger + assetName := strings.ToUpper(dex.BipIDSymbol(baseChainID)) netAddrs, found := contractAddrs[ethContractVersion] if !found { diff --git a/server/asset/eth/eth_test.go b/server/asset/eth/eth_test.go index f4a8d8ea80..8a2052caf9 100644 --- a/server/asset/eth/eth_test.go +++ b/server/asset/eth/eth_test.go @@ -903,3 +903,78 @@ func TestIsRemoteURL(t *testing.T) { } } } + +func TestParseEndpoints(t *testing.T) { + type test struct { + name string + fileContents string + relayAddr string + expectedEndpoints []string + wantErr bool + } + + url1 := "http://127.0.0.1:1234" + url2 := "https://example.com" + relayAddr := "123.111.4.8:1111" + relayURL := "http://" + relayAddr + + tests := []*test{ + { + name: "single localhost in file", + fileContents: url1, + expectedEndpoints: []string{"http://127.0.0.1:1234"}, + }, + { + name: "no path provided error", + wantErr: true, + }, + { + name: "two from file and a noderelay", + fileContents: url1 + "\n" + url2, + relayAddr: relayAddr, + expectedEndpoints: []string{relayURL, url1, url2}, + }, + { + name: "just a relay adddress", + relayAddr: relayAddr, + expectedEndpoints: []string{relayURL}, + }, + } + + runTest := func(t *testing.T, tt *test) { + var configPath string + if tt.fileContents != "" { + f, err := os.CreateTemp("", "") + if err != nil { + t.Fatalf("error getting temporary file") + } + configPath = f.Name() + defer os.Remove(configPath) + defer f.Close() + f.WriteString(tt.fileContents) + } + endpoints, err := parseEndpoints(&asset.BackendConfig{ + ConfigPath: configPath, + RelayAddr: tt.relayAddr, + }) + if err != nil { + if tt.wantErr { + return + } + t.Fatalf("parseEndpoints error: %v", err) + } + if len(endpoints) != len(tt.expectedEndpoints) { + t.Fatalf("wrong number of endpoints. wanted %d, got %d", len(tt.expectedEndpoints), len(endpoints)) + } + for i, pt := range endpoints { + if expURL := tt.expectedEndpoints[i]; pt.url != expURL { + t.Fatalf("wrong endpoint at index %d: wanted %s, got %s", i, expURL, pt.url) + } + } + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + runTest(t, tt) + }) + } +} diff --git a/server/asset/firo/firo.go b/server/asset/firo/firo.go index 0e34c70f35..c4c2c04f30 100644 --- a/server/asset/firo/firo.go +++ b/server/asset/firo/firo.go @@ -25,8 +25,8 @@ import ( type Driver struct{} // Setup creates the DGB backend. Start the backend with its Run method. -func (d *Driver) Setup(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { - return NewBackend(configPath, logger, network) +func (d *Driver) Setup(cfg *asset.BackendConfig) (asset.Backend, error) { + return NewBackend(cfg) } // DecodeCoinID creates a human-readable representation of a coin ID for @@ -58,9 +58,9 @@ const ( // NewBackend generates the network parameters and creates a dgb backend as a // btc clone using an asset/btc helper function. -func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { +func NewBackend(cfg *asset.BackendConfig) (asset.Backend, error) { var params *chaincfg.Params - switch network { + switch cfg.Net { case dex.Mainnet: params = dexfiro.MainNetParams case dex.Testnet: @@ -68,7 +68,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse case dex.Regtest: params = dexfiro.RegressionNetParams default: - return nil, fmt.Errorf("unknown network ID %v", network) + return nil, fmt.Errorf("unknown network ID %v", cfg.Net) } // Designate the clone ports. @@ -78,6 +78,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Simnet: "28888", } + configPath := cfg.ConfigPath if configPath == "" { configPath = dexbtc.SystemConfigPath("firo") } @@ -86,8 +87,8 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Name: assetName, Segwit: false, ConfigPath: configPath, - Logger: logger, - Net: network, + Logger: cfg.Logger, + Net: cfg.Net, ChainParams: params, Ports: ports, // 2 blocks should be enough - Firo has masternode 1 block finalize @@ -102,7 +103,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse // apparently, and we don't have getblockstats, so we need to scan // blocks on testnet. BlockFeeTransactions: func(rc *btc.RPCClient, blockHash *chainhash.Hash) ([]btc.FeeTx, chainhash.Hash, error) { - return btcBlockFeeTransactions(rc, blockHash, network) + return btcBlockFeeTransactions(rc, blockHash, cfg.Net) }, MaxFeeBlocks: 16, // failsafe BooleanGetBlockRPC: true, @@ -112,8 +113,9 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse // It also doesn't accept an estimate_mode argument. // Neither estimatesmartfee or estimatefee work on testnet v0.14.12.4, // but estimatefee works on simnet, and on mainnet v0.14.12.1. - DumbFeeEstimates: true}, - ) + DumbFeeEstimates: true, + RelayAddr: cfg.RelayAddr, + }) } // Firo v0.14.12.1 defaults: diff --git a/server/asset/ltc/live_test.go b/server/asset/ltc/live_test.go index 30da6dd33f..a5b9467abd 100644 --- a/server/asset/ltc/live_test.go +++ b/server/asset/ltc/live_test.go @@ -30,6 +30,7 @@ import ( "testing" "decred.org/dcrdex/dex" + "decred.org/dcrdex/server/asset" "decred.org/dcrdex/server/asset/btc" ) @@ -50,7 +51,11 @@ func TestMain(m *testing.M) { }() logger := dex.StdOutLogger("LTCTEST", dex.LevelTrace) - dexAsset, err := NewBackend("", logger, dex.Mainnet) + dexAsset, err := NewBackend(&asset.BackendConfig{ + AssetID: BipID, + Logger: logger, + Net: dex.Mainnet, + }) if err != nil { fmt.Printf("NewBackend error: %v\n", err) return 1 diff --git a/server/asset/ltc/ltc.go b/server/asset/ltc/ltc.go index 7ecfffe0de..dfa4e0c082 100644 --- a/server/asset/ltc/ltc.go +++ b/server/asset/ltc/ltc.go @@ -18,8 +18,8 @@ import ( type Driver struct{} // Setup creates the LTC backend. Start the backend with its Run method. -func (d *Driver) Setup(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { - return NewBackend(configPath, logger, network) +func (d *Driver) Setup(cfg *asset.BackendConfig) (asset.Backend, error) { + return NewBackend(cfg) } // DecodeCoinID creates a human-readable representation of a coin ID for @@ -51,9 +51,9 @@ const ( // NewBackend generates the network parameters and creates a ltc backend as a // btc clone using an asset/btc helper function. -func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { +func NewBackend(cfg *asset.BackendConfig) (asset.Backend, error) { var params *chaincfg.Params - switch network { + switch cfg.Net { case dex.Mainnet: params = dexltc.MainNetParams case dex.Testnet: @@ -61,7 +61,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse case dex.Regtest: params = dexltc.RegressionNetParams default: - return nil, fmt.Errorf("unknown network ID %v", network) + return nil, fmt.Errorf("unknown network ID %v", cfg.Net) } // Designate the clone ports. These will be overwritten by any explicit @@ -72,6 +72,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Simnet: "19443", } + configPath := cfg.ConfigPath if configPath == "" { configPath = dexbtc.SystemConfigPath("litecoin") } @@ -80,8 +81,8 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Name: assetName, Segwit: true, ConfigPath: configPath, - Logger: logger, - Net: network, + Logger: cfg.Logger, + Net: cfg.Net, ChainParams: params, Ports: ports, BlockDeserializer: dexltc.DeserializeBlockBytes, @@ -90,5 +91,6 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse // anyway. FeeConfs: 2, MaxFeeBlocks: 20, + RelayAddr: cfg.RelayAddr, }) } diff --git a/server/asset/polygon/polygon.go b/server/asset/polygon/polygon.go index 183ba79271..485d35533d 100644 --- a/server/asset/polygon/polygon.go +++ b/server/asset/polygon/polygon.go @@ -74,6 +74,6 @@ type Driver struct { } // Setup creates the ETH backend. Start the backend with its Run method. -func (d *Driver) Setup(configPath string, logger dex.Logger, net dex.Network) (asset.Backend, error) { - return eth.NewEVMBackend(BipID, configPath, logger, dexpolygon.ContractAddresses, registeredTokens, net) +func (d *Driver) Setup(cfg *asset.BackendConfig) (asset.Backend, error) { + return eth.NewEVMBackend(cfg, dexpolygon.ContractAddresses, registeredTokens) } diff --git a/server/asset/zec/live_test.go b/server/asset/zec/live_test.go index 1f782c0891..495020a3c6 100644 --- a/server/asset/zec/live_test.go +++ b/server/asset/zec/live_test.go @@ -25,6 +25,7 @@ import ( "testing" "decred.org/dcrdex/dex" + "decred.org/dcrdex/server/asset" "decred.org/dcrdex/server/asset/btc" ) @@ -37,7 +38,11 @@ func TestMain(m *testing.M) { // Wrap everything for defers. doIt := func() int { logger := dex.StdOutLogger("ZECTEST", dex.LevelTrace) - be, err := NewBackend("", logger, dex.Mainnet) + be, err := NewBackend(&asset.BackendConfig{ + AssetID: BipID, + Logger: logger, + Net: dex.Mainnet, + }) if err != nil { fmt.Printf("NewBackend error: %v\n", err) return 1 diff --git a/server/asset/zec/zec.go b/server/asset/zec/zec.go index 8591d67223..21506f7041 100644 --- a/server/asset/zec/zec.go +++ b/server/asset/zec/zec.go @@ -24,8 +24,8 @@ import ( type Driver struct{} // Setup creates the Zcash backend. Start the backend with its Run method. -func (d *Driver) Setup(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { - return NewBackend(configPath, logger, network) +func (d *Driver) Setup(cfg *asset.BackendConfig) (asset.Backend, error) { + return NewBackend(cfg) } // DecodeCoinID creates a human-readable representation of a coin ID for @@ -58,10 +58,10 @@ const ( // NewBackend generates the network parameters and creates a zec backend as a // btc clone using an asset/btc helper function. -func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asset.Backend, error) { +func NewBackend(cfg *asset.BackendConfig) (asset.Backend, error) { var btcParams *chaincfg.Params var addrParams *dexzec.AddressParams - switch network { + switch cfg.Net { case dex.Mainnet: btcParams = dexzec.MainNetParams addrParams = dexzec.MainNetAddressParams @@ -72,7 +72,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse btcParams = dexzec.RegressionNetParams addrParams = dexzec.RegressionNetAddressParams default: - return nil, fmt.Errorf("unknown network ID %v", network) + return nil, fmt.Errorf("unknown network ID %v", cfg.Net) } // Designate the clone ports. These will be overwritten by any explicit @@ -83,6 +83,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Simnet: "18232", } + configPath := cfg.ConfigPath if configPath == "" { configPath = dexbtc.SystemConfigPath("zcash") } @@ -91,8 +92,8 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse Name: assetName, Segwit: false, ConfigPath: configPath, - Logger: logger, - Net: network, + Logger: cfg.Logger, + Net: cfg.Net, ChainParams: btcParams, Ports: ports, AddressDecoder: func(addr string, net *chaincfg.Params) (btcutil.Address, error) { @@ -117,6 +118,7 @@ func NewBackend(configPath string, logger dex.Logger, network dex.Network) (asse BlockFeeTransactions: blockFeeTransactions, NumericGetRawRPC: true, ShieldedIO: shieldedIO, + RelayAddr: cfg.RelayAddr, }) if err != nil { return nil, err diff --git a/server/cmd/dcrdex/config.go b/server/cmd/dcrdex/config.go index a6385ead72..149d2cea2a 100644 --- a/server/cmd/dcrdex/config.go +++ b/server/cmd/dcrdex/config.go @@ -99,6 +99,7 @@ type dexConf struct { AdminSrvPW []byte NoResumeSwaps bool DisableDataAPI bool + NodeRelayAddr string } type flagsData struct { @@ -150,6 +151,8 @@ type flagsData struct { NoResumeSwaps bool `long:"noresumeswaps" description:"Do not attempt to resume swaps that are active in the DB."` DisableDataAPI bool `long:"nodata" description:"Disable the HTTP data API."` + + NodeRelayAddr string `long:"noderelayaddr" description:"The public address by which node sources should connect to the node relay"` } // supportedSubsystems returns a sorted slice of the supported subsystems for @@ -557,6 +560,7 @@ func loadConfig() (*dexConf, *procOpts, error) { AdminSrvPW: []byte(cfg.AdminSrvPassword), NoResumeSwaps: cfg.NoResumeSwaps, DisableDataAPI: cfg.DisableDataAPI, + NodeRelayAddr: cfg.NodeRelayAddr, } opts := &procOpts{ diff --git a/server/cmd/dcrdex/main.go b/server/cmd/dcrdex/main.go index d108c22d20..096ea997d6 100644 --- a/server/cmd/dcrdex/main.go +++ b/server/cmd/dcrdex/main.go @@ -152,6 +152,7 @@ func mainCore(ctx context.Context) error { HiddenServiceAddr: cfg.HiddenService, }, NoResumeSwaps: cfg.NoResumeSwaps, + NodeRelayAddr: cfg.NodeRelayAddr, } dexMan, err := dexsrv.NewDEX(ctx, dexConf) // ctx cancel just aborts setup; Stop does normal shutdown if err != nil { diff --git a/server/dex/dex.go b/server/dex/dex.go index ecd7a13256..abbbcd5256 100644 --- a/server/dex/dex.go +++ b/server/dex/dex.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" "strconv" "strings" "sync" @@ -25,6 +26,7 @@ import ( "decred.org/dcrdex/server/db" "decred.org/dcrdex/server/db/driver/pg" "decred.org/dcrdex/server/market" + "decred.org/dcrdex/server/noderelay" "decred.org/dcrdex/server/swap" "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa" @@ -56,6 +58,7 @@ type AssetConf struct { BondAmt uint64 `json:"bondAmt,omitempty"` BondConfs uint32 `json:"bondConfs,omitempty"` Disabled bool `json:"disabled"` + NodeRelayID string `json:"nodeRelayID,omitempty"` } // DBConf groups the database configuration parameters. @@ -89,6 +92,7 @@ type DexConf struct { DEXPrivKey *secp256k1.PrivateKey CommsCfg *RPCConfig NoResumeSwaps bool + NodeRelayAddr string } type signer struct { @@ -300,6 +304,7 @@ func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { // Check each configured asset. assetIDs := make([]uint32, len(cfg.Assets)) + var nodeRelayIDs []string for i, assetConf := range cfg.Assets { symbol := strings.ToLower(assetConf.Symbol) @@ -324,6 +329,10 @@ func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { return nil, fmt.Errorf("max fee rate of 0 is invalid for asset %q", symbol) } + if assetConf.NodeRelayID != "" { + nodeRelayIDs = append(nodeRelayIDs, assetConf.NodeRelayID) + } + assetIDs[i] = assetID } @@ -383,6 +392,41 @@ func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { return nil, fmt.Errorf("db.Open: %w", err) } + relayAddrs := make(map[string]string, len(nodeRelayIDs)) + if len(nodeRelayIDs) > 0 { + nexusPort := "17537" + switch cfg.Network { + case dex.Testnet: + nexusPort = "17538" + case dex.Simnet: + nexusPort = "17539" + } + relayDir := filepath.Join(cfg.DataDir, "noderelay") + relay, err := noderelay.NewNexus(&noderelay.NexusConfig{ + ExternalAddr: cfg.NodeRelayAddr, + Dir: relayDir, + Port: nexusPort, + Logger: dex.StdOutLogger("T", dex.LevelDebug), + RelayIDs: nodeRelayIDs, + }) + if err != nil { + return nil, fmt.Errorf("error creating node relay: %w", err) + } + if err := startSubSys("Node relay", relay); err != nil { + return nil, fmt.Errorf("error starting node relay: %w", err) + } + select { + case <-relay.WaitForSourceNodes(): + case <-ctx.Done(): + return nil, ctx.Err() + } + for _, relayID := range nodeRelayIDs { + if relayAddrs[relayID], err = relay.RelayAddr(relayID); err != nil { + return nil, fmt.Errorf("error getting relay address for ID %s: %w", relayID, err) + } + } + } + dataAPI := apidata.NewDataAPI(storage) // Create a MasterCoinLocker for each asset. @@ -432,7 +476,14 @@ func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { return fmt.Errorf("failed to setup token %q: %w", symbol, err) } } else { - be, err = asset.Setup(assetID, assetConf.ConfigPath, logger, cfg.Network) + cfg := &asset.BackendConfig{ + AssetID: assetID, + ConfigPath: assetConf.ConfigPath, + Logger: logger, + Net: cfg.Network, + RelayAddr: relayAddrs[assetConf.NodeRelayID], + } + be, err = asset.Setup(cfg) if err != nil { return fmt.Errorf("failed to setup asset %q: %w", symbol, err) } diff --git a/server/noderelay/README.md b/server/noderelay/README.md new file mode 100644 index 0000000000..8e72287115 --- /dev/null +++ b/server/noderelay/README.md @@ -0,0 +1,33 @@ +# NodeRelay + +NodeRelay is a system for connecting to nodes on remote private machines. +This is done through a WebSockets-based reverse tunnel. + +1. Run a full node on a home or otherwise private server. Coordinate credentials +for RPC requests (`rcpuser`, `rpcpassword`) with the server operator. + +2. The server operator will expose the NodeRelay through an external IP address. + +1. The server operator generates a **relay ID**, which can be any string without +whitespace. Each asset backend that will connect through NodeRelay will need its +own **relay ID**. + +1. The server operator modifies the asset configuration to specify a relay ID +for each asset for which a noderelay is needed. + +1. The server operator specifies a `noderelayaddr` in as part of the dcrdex +configuration. This is the external address at which source nodes will contact +the server. + +1. Upon starting, the server will generate and store a **relayfile** to a +directory located by default at `~/.dcrdex/data/mainnet/noderelay/relay-files/`, +with a file name of e.g. `btc_0405f1069d352a0f.relayfile`. Send this file to +the private server. The **relayfile** is good until the relay ID is changed or +a new TLS key-cert pair is generated. + +1. On the private server, run `sourcenode`, pointing at the relay file with +`--relayfile` and setting the node RPC port with `--port`. Specify a TLS +certificate for full node RPC with `--localcert`, if required. + ``` + ./sourcenode --relayfile btc_0405f1069d352a0f.relayfile --port 8332 + ``` diff --git a/server/noderelay/cmd/sourcenode/main.go b/server/noderelay/cmd/sourcenode/main.go new file mode 100644 index 0000000000..50a909b874 --- /dev/null +++ b/server/noderelay/cmd/sourcenode/main.go @@ -0,0 +1,278 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +// sourcenode is the client for a NodeRelay. A NodeRelay is a remote server that +// can request data from the API of a local service, presumably running on a +// private machine which is not accessible from any static IP address or domain. +// In this inverted consumer-provider model, the provider connects to the +// consumer, and then accepts data requests over WebSockets, routing them to the +// local service and responding with the results. + +package main + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "os" + "os/signal" + "sync/atomic" + "time" + + "decred.org/dcrdex/client/comms" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/server/noderelay" +) + +var log = dex.StdOutLogger("NODESRC", dex.LevelDebug) + +func main() { + if err := mainErr(); err != nil { + fmt.Fprint(os.Stderr, err, "\n") + os.Exit(1) + } + os.Exit(0) +} + +func mainErr() (err error) { + var ( + // port is a required argument. + port string + // optional, but required for e.g. dcrd, which uses an encrypted + // connection. + localNodeCert string + + // User can provide NodeRelay server configuration in one of two ways. + // 1) nexusAddr + relayID + certPath + nexusAddr string + relayID string + certPath string // NodeRelay's TLS certificate + // 2) relayfile, provided by NodeRelay operator + relayFilepath string + ) + + // required + flag.StringVar(&port, "port", "", "The port that the local service is listening on") + // optional. (dcrd needs by default) + flag.StringVar(&localNodeCert, "localcert", "", "The path to a TLS certificate for the local service") // optional + // connect either this way... + flag.StringVar(&relayFilepath, "relayfile", "", "The path to a relay file (provided by the server)") + // or connect with these parameters + flag.StringVar(&relayID, "relayid", "", "The relay ID") + flag.StringVar(&certPath, "certpath", "", "The path to a TLS certificate for the server") + flag.StringVar(&nexusAddr, "addr", "", "The address to the server") + + flag.Parse() + + if port == "" { + return errors.New("no local port provided") + } + + var certB []byte + if relayFilepath != "" { + b, err := os.ReadFile(relayFilepath) + if err != nil { + return fmt.Errorf("error reading relay file @ %q: %w", relayFilepath, err) + } + var relayFile noderelay.RelayFile + if err := json.Unmarshal(b, &relayFile); err != nil { + return fmt.Errorf("error parsing relay file: %w", err) + } + relayID = relayFile.RelayID + certB = relayFile.Cert + nexusAddr = relayFile.Addr + } else { + if certPath == "" { + return errors.New("specify a --certpath") + } + var err error + certB, err = os.ReadFile(certPath) + if err != nil { + return fmt.Errorf("error reading server certificate at %q: %v", certPath, err) + } + } + + if port == "" { + return errors.New("specify the --port that the local service is listening on") + } + if relayID == "" { + return errors.New("specify a --relayid") + } + if nexusAddr == "" { + return errors.New("specify a --addr for the server") + } + + if len(certB) == 0 { + return errors.New("no server TLS certificate provided") + } + + registration, err := json.Marshal(&noderelay.RelayedMessage{ + MessageID: 0, // must be zero for registration. + Body: []byte(relayID), + }) + if err != nil { + return fmt.Errorf("error json-encoding registration message: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + killChan := make(chan os.Signal, 1) + signal.Notify(killChan, os.Interrupt) + go func() { + <-killChan + fmt.Println("Shutting down...") + cancel() + }() + + // Keep track of some basic stats. + var stats struct { + requests uint32 + errors uint32 + received uint64 + sent uint64 + } + + // Periodically print the node usage statistics. + go func() { + start := time.Now() + for { + select { + case <-time.After(time.Minute * 10): + case <-ctx.Done(): + return + } + log.Infof("%d requests, %.4g MB received, %.4g MB sent, %d errors in %s", + atomic.LoadUint32(&stats.requests), float64(atomic.LoadUint64(&stats.received))/1e6, + float64(atomic.LoadUint64(&stats.sent))/1e6, atomic.LoadUint32(&stats.errors), + time.Since(start)) + } + }() + + localNodeURL := "http://127.0.0.1" + ":" + port + httpClient := http.DefaultClient + if localNodeCert != "" { + localNodeURL = "https://127.0.0.1" + ":" + port + pem, err := os.ReadFile(localNodeCert) + if err != nil { + return err + } + + uri, err := url.Parse(localNodeURL) + if err != nil { + return fmt.Errorf("error parsing URL: %v", err) + } + + pool := x509.NewCertPool() + if ok := pool.AppendCertsFromPEM(pem); !ok { + return fmt.Errorf("invalid certificate file: %v", localNodeCert) + } + tlsConfig := &tls.Config{ + RootCAs: pool, + ServerName: uri.Hostname(), + } + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + } + + // Define this now, so we can create a ReconnectSync function. + var cl comms.WsConn + + registerOrReregister := func() { + if err := cl.SendRaw(registration); err != nil { + log.Errorf("Error sending registration message: %v", err) + } + } + + cl, err = comms.NewWsConn(&comms.WsCfg{ + URL: "wss://" + nexusAddr, + PingWait: noderelay.PingPeriod * 2, + Cert: certB, + // On a disconnect, wsConn will attempt to reconnect immediately. If + // the first attempt is unsuccessful, it will wait 5 seconds for next + // success, then 10, 15 ... up to a minute. + // We'll just send the registration on reconnect. + ReconnectSync: registerOrReregister, + ConnectEventFunc: func(s comms.ConnectionStatus) {}, + Logger: dex.StdOutLogger("CL", dex.LevelDebug), + RawHandler: func(b []byte) { + atomic.AddUint64(&stats.received, uint64(len(b))) + atomic.AddUint32(&stats.requests, 1) + // Request received from server. + var msg noderelay.RelayedMessage + if err := json.Unmarshal(b, &msg); err != nil { + atomic.AddUint32(&stats.errors, 1) + log.Errorf("json unmarshal error: %v", err) + return + } + // Prepare mirrored request for local service. + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + req, err := http.NewRequestWithContext(ctx, msg.Method, localNodeURL, bytes.NewReader(msg.Body)) + if err != nil { + atomic.AddUint32(&stats.errors, 1) + log.Errorf("Error constructing request: %v", err) + return + } + req.Header = msg.Headers + // Send request to local service. + resp, err := httpClient.Do(req) + if err != nil { + atomic.AddUint32(&stats.errors, 1) + log.Errorf("error processing request: %v", err) + return + } + // Read response from local service and encode for the node relay. + b, err = io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + atomic.AddUint32(&stats.errors, 1) + log.Errorf("Error reading response: %v", err) + return + } + atomic.AddUint64(&stats.sent, uint64(len(b))) + encResp, err := json.Marshal(&noderelay.RelayedMessage{ + MessageID: msg.MessageID, + Body: b, + Headers: resp.Header, + }) + if err != nil { + log.Error("Error during json encoding: %v", err) + } + if err := cl.SendRaw(encResp); err != nil { + log.Errorf("SendRaw error: %v", err) + } + + }, + }) + + cm := dex.NewConnectionMaster(cl) + if err := cm.ConnectOnce(ctx); err != nil { + return fmt.Errorf("websocketHandler client connect: %v", err) + } + + // The default read limit is 1024, I think. + if ws, is := cl.(interface { + SetReadLimit(limit int64) + }); is { + const readLimit = 2_097_152 // 2 MiB + ws.SetReadLimit(readLimit) + } + + registerOrReregister() + + cm.Wait() + return nil +} diff --git a/server/noderelay/noderelay.go b/server/noderelay/noderelay.go new file mode 100644 index 0000000000..aa6dea744e --- /dev/null +++ b/server/noderelay/noderelay.go @@ -0,0 +1,661 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package noderelay + +import ( + "context" + "crypto/elliptic" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "math/rand" + "net" + "net/http" + "os" + "path/filepath" + "regexp" + "sync" + "sync/atomic" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/ws" + "github.com/decred/dcrd/certgen" +) + +const ( + // PongWait is the time allowed to read the next pong message from the peer. + PongWait = 60 * time.Second + // PingPeriod sets the frequency on which websocket clients should ping. + PingPeriod = 30 * time.Second + // connectTimeoutSeconds is used to ensure timely connections. + connectTimeoutSeconds = 10 + // expireTime is the time after sending a request to the source node before + // we consider the request failed. + expireTime = time.Second * 30 +) + +// sourceNode represents a connected source node. +type sourceNode struct { + relayID string + addr string + cl *ws.WSLink + + reqMtx sync.Mutex + respHandlers map[uint64]*responseHandler +} + +// logReq stores the response handler in the respHandlers map. Requests to the +// client are associated with a response handler. +func (n *sourceNode) logReq(reqID uint64, respHandler func([]byte, map[string][]string), expire func()) { + n.reqMtx.Lock() + defer n.reqMtx.Unlock() + doExpire := func() { + // Delete the response handler, and call the provided expire function if + // *Nexus has not already retrieved the handler function for execution. + if n.expireRequest(reqID) { + expire() + } + } + n.respHandlers[reqID] = &responseHandler{ + f: respHandler, + expire: time.AfterFunc(expireTime, doExpire), + } +} + +// expireRequest expires the pending request. +func (n *sourceNode) expireRequest(reqID uint64) bool { + n.reqMtx.Lock() + defer n.reqMtx.Unlock() + _, removed := n.respHandlers[reqID] + delete(n.respHandlers, reqID) + return removed +} + +// respHandler gets the stored responseHandler, if it exists, else nil. +func (n *sourceNode) respHandler(reqID uint64) *responseHandler { + n.reqMtx.Lock() + defer n.reqMtx.Unlock() + cb, ok := n.respHandlers[reqID] + if ok { + // Stop the expiration Timer. If the Timer fired after respHandler was + // called, but we found the response handler in the map, wsLink.expire + // is waiting for the reqMtx lock and will return false, thus preventing + // the registered expire func from executing. + cb.expire.Stop() + delete(n.respHandlers, reqID) + } + return cb +} + +// responseHandler is a handler for the response from a sent WebSockets request. +type responseHandler struct { + f func([]byte, map[string][]string) + expire *time.Timer +} + +// nodeRelay manages source nodes. +type nodeRelay struct { + sync.RWMutex + sources map[string]*sourceNode +} + +type NexusConfig struct { + // ExternalAddr is the external IP:port address or host(:port) of the + // Nexus relay manager. The operator must configure this independently + // so that the External address is routed to the specified Port listening + // on all loopback interfaces. + ExternalAddr string + // Port is the port that Nexus will listen on. + Port string + // Dir is a directory to output relayfiles and generated TLS key-cert pairs. + Dir string + // Key is the path to a TLS key. If Key == "", a new key and certificate + // will be created in the Dir. The ExternalAddr will be added to the + // certificate as a host. If the ExternalAddr changes, a new certificate + // can be generated by deleting the old key-cert pair and restarting. + // Changing the ExternalAddr renders any previously generated relayfiles + // void. + Key string + // Cert is the path to a TLS certificate. See docs for Key. + Cert string + Logger dex.Logger + // RelayIDs are the relay IDs for which to start node relays. These can be + // any string the caller chooses. These relay IDs are given to source node + // operators (generally as part of a relayfile) and are used to configure + // their source nodes. The channel returned from WaitForSourceNodes will + // not close until there is at least one source node connected for every + // ID in RelayIDs. + RelayIDs []string +} + +// normalize checks sanity and sets defaults for the NexusConfig. +func (cfg *NexusConfig) normalize() error { + const ( + defaultNexusPort = "17537" + keyFilename = "relay.key" + certFilename = "relay.cert" + ) + if len(cfg.RelayIDs) == 0 { + return errors.New("no relays specified") + } + re := regexp.MustCompile(`\s`) + for _, relayID := range cfg.RelayIDs { + if re.MatchString(relayID) { + return fmt.Errorf("relay ID %q contains whitespace", relayID) + } + } + if cfg.Port == "" { + cfg.Port = defaultNexusPort + } + if cfg.Key == "" { + cfg.Key = filepath.Join(cfg.Dir, keyFilename) + } + if cfg.Cert == "" { + cfg.Cert = filepath.Join(cfg.Dir, certFilename) + } + return nil +} + +// prepareKeys loads the TLS certificate, creating a key-cert pair if necessary. +func (cfg *NexusConfig) prepareKeys() (*tls.Config, []byte, error) { + keyExists := dex.FileExists(cfg.Key) + certExists := dex.FileExists(cfg.Cert) + if certExists == !keyExists { + return nil, nil, fmt.Errorf("missing cert pair file") + } + if !keyExists { + // certgen will actually ignore the port, but we'll remove it for good + // measure. + var dnsNames []string + if cfg.ExternalAddr != "" { + host, _, err := net.SplitHostPort(cfg.ExternalAddr) + if err != nil { + return nil, nil, fmt.Errorf("error parsing public address: %v", err) + } + dnsNames = []string{host} + } + err := genCertPair(cfg.Cert, cfg.Key, dnsNames, cfg.Logger) + if err != nil { + return nil, nil, err + } + } + keypair, err := tls.LoadX509KeyPair(cfg.Cert, cfg.Key) + if err != nil { + return nil, nil, err + } + + certB, err := os.ReadFile(cfg.Cert) + if err != nil { + return nil, nil, fmt.Errorf("error loading certificate file contents: %v", err) + } + + // Prepare the TLS configuration. + return &tls.Config{ + Certificates: []tls.Certificate{keypair}, + MinVersion: tls.VersionTLS12, + }, certB, nil +} + +// Nexus is run on the server and manages a series of node relays. A source node +// will connect to the Nexus, making their services available for a local +// consumer. +type Nexus struct { + ctx context.Context + cfg *NexusConfig + tlsConfig *tls.Config + relayAddrs map[string]string + log dex.Logger + wg sync.WaitGroup + certB []byte + relayfileDir string + allNodesConnected chan struct{} + relays map[string]*nodeRelay +} + +// NewNexus is the constructor for a Nexus. +func NewNexus(cfg *NexusConfig) (*Nexus, error) { + if err := cfg.normalize(); err != nil { + return nil, err + } + relayfileDir := filepath.Join(cfg.Dir, "relay-files") + if err := os.MkdirAll(relayfileDir, 0700); err != nil { + return nil, fmt.Errorf("error creating relay file directory: %w", err) + } + tlsConfig, certB, err := cfg.prepareKeys() + if err != nil { + return nil, err + } + + relays := make(map[string]*nodeRelay, len(cfg.RelayIDs)) + for _, relayID := range cfg.RelayIDs { + relays[relayID] = &nodeRelay{ + sources: make(map[string]*sourceNode), + } + } + + return &Nexus{ + cfg: cfg, + tlsConfig: tlsConfig, + relayAddrs: make(map[string]string), + log: cfg.Logger, + relays: relays, + certB: certB, + relayfileDir: relayfileDir, + allNodesConnected: make(chan struct{}), + }, nil +} + +// RelayAddr returns the local address for relay, or an error if there is no +// server running for the given relay ID. +func (n *Nexus) RelayAddr(relayID string) (string, error) { + relayAddr, found := n.relayAddrs[relayID] + if !found { + return "", fmt.Errorf("no relay node found for ID %q", relayID) + } + return relayAddr, nil +} + +// monitorNodeConnections checks the status of relays once per second, and +// closes the allNodesConnected channel when every relay has at least one +// source node. +func (n *Nexus) monitorNodeConnections() { + nodeReport := func() (registered, unregistered []string) { + for relayID, relay := range n.relays { + relay.RLock() + if len(relay.sources) > 0 { + registered = append(registered, relayID) + } else { + unregistered = append(unregistered, relayID) + } + relay.RUnlock() + } + return + } + + n.log.Infof("Node relay waiting on %d source nodes to connect", len(n.relays)) + lastLog := time.Time{} + for { + if r, u := nodeReport(); len(u) == 0 { + close(n.allNodesConnected) + return + } else if time.Since(lastLog) > time.Minute { + lastLog = time.Now() + n.log.Infof("Node relay waiting on sources. %d / %d connected. Missing sources for relays %+v", len(r), len(r)+len(u), u) + } + select { + case <-time.After(time.Second): + case <-n.ctx.Done(): + return + } + } +} + +// WaitForSourceNodes returns a channel that will be closed when a source node +// has connected for all relays. +func (n *Nexus) WaitForSourceNodes() <-chan struct{} { + return n.allNodesConnected +} + +// RelayFile is used for encoding JSON relayfiles. A relayfile is a file that +// contains all the relevant connection information for a source node +// configuration. Nexus will generate a relayfile for each relay ID on startup. +type RelayFile struct { + RelayID string `json:"relayID"` + Cert dex.Bytes `json:"cert"` + Addr string `json:"addr"` +} + +// Connect starts the Nexus, creating a relay node for every relay ID. +func (n *Nexus) Connect(ctx context.Context) (*sync.WaitGroup, error) { + n.ctx = ctx + + log, wg := n.cfg.Logger, &n.wg + + inAddr := "0.0.0.0:" + n.cfg.Port + + // Create listener. + listener, err := tls.Listen("tcp", inAddr, n.tlsConfig) + if err != nil { + return nil, fmt.Errorf("can't listen on %s. nexus server quitting: %w", inAddr, err) + } + // Update the listening address in case a :0 was provided. + addr := listener.Addr().String() + + for _, relayID := range n.cfg.RelayIDs { + relayAddr, err := n.runRelayServer(relayID) + if err != nil { + return nil, fmt.Errorf("error running node server for relay ID %s", relayID) + } + n.relayAddrs[relayID] = relayAddr + + relayfilePath := filepath.Join(n.relayfileDir, relayID+".relayfile") + + b, err := json.Marshal(&RelayFile{ + RelayID: relayID, + Cert: n.certB, + Addr: n.cfg.ExternalAddr, + }) + if err != nil { + n.log.Errorf("error encoding relay file: %v", err) + } else if err = os.WriteFile(relayfilePath, b, 0600); err != nil { + n.log.Errorf("error writing relay file: %v", err) + } + } + + srv := &http.Server{ + Handler: http.HandlerFunc(n.handleSourceConnect), + ReadTimeout: connectTimeoutSeconds * time.Second, // slow requests should not hold connections opened + WriteTimeout: connectTimeoutSeconds * time.Second, // hung responses must die + } + + // Close the listener on context cancellation. + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + if err := srv.Shutdown(context.Background()); err != nil { + // Error from closing listeners: + log.Errorf("HTTP server Shutdown: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.Serve(listener); !errors.Is(err, http.ErrServerClosed) { + log.Warnf("unexpected (http.Server).Serve error: %v", err) + } + log.Infof("Server off") + }() + log.Infof("Noderelay server listening on %s", addr) + + go n.monitorNodeConnections() + + return &n.wg, nil +} + +// handleSourceConnect handles a connection from a source node, upgrading the +// connection to a websocket connection and adding the sourceNode to the +// relayNode.sources. +func (n *Nexus) handleSourceConnect(w http.ResponseWriter, r *http.Request) { + wg, log, ctx := &n.wg, n.log, n.ctx + conn, err := ws.NewConnection(w, r, PongWait) + if err != nil { + log.Errorf("ws connection error: %v", err) + return + } + ip := dex.NewIPKey(r.RemoteAddr) + + wg.Add(1) + go func() { + defer wg.Done() + + cl := ws.NewWSLink(ip.String(), conn, PingPeriod, nil, log) + defer cl.Disconnect() + + node := &sourceNode{ + cl: cl, + addr: r.RemoteAddr, + respHandlers: make(map[uint64]*responseHandler), + } + + registered := make(chan error) + cl.RawHandler = func(b []byte) { + var resp RelayedMessage + if err := json.Unmarshal(b, &resp); err != nil { + n.log.Errorf("error unmarshalling connect message: %v", err) + return + } + + if resp.MessageID == 0 { + node.relayID = string(resp.Body) + select { + case registered <- nil: + default: + log.Debugf("blocking node id channel") + } + return + } + if node.relayID == "" { + registered <- fmt.Errorf("received numbered request from %s before node ID", ip) + cl.Disconnect() + return + } + respHandler := node.respHandler(resp.MessageID) + if respHandler == nil { + n.log.Errorf("no handler for response from %s", ip) + return + } + respHandler.f(resp.Body, resp.Headers) + } + cm := dex.NewConnectionMaster(cl) + err := cm.ConnectOnce(ctx) // we discard the cm anyway, but good practice + if err != nil { + log.Errorf("websocketHandler client connect: %v", err) + return + } + + const readLimit = 2_097_152 // 2 MiB + cl.SetReadLimit(readLimit) + + select { + case err := <-registered: + if err != nil { + log.Error(err) + return + } + case <-time.After(connectTimeoutSeconds * time.Second): + log.Errorf("connected nexus source failed to ID") + return + case <-ctx.Done(): + return + } + + if _, found := n.relayAddrs[node.relayID]; !found { + log.Warnf("source node trying to register with unknown relay ID %s", node.relayID) + return + } + + relay, exists := n.relays[node.relayID] + if !exists { + log.Errorf("no relay with ID %s for source node connecting from %s", node.relayID, node.addr) + return + } + relay.Lock() + if oldNode, exists := relay.sources[node.addr]; exists { + oldNode.cl.Disconnect() + } + relay.sources[node.addr] = node + nodeCount := len(relay.sources) + relay.Unlock() + + log.Infof("Source node for relay %q has connected from IP %s. %d sources now serving this relay", node.relayID, node.addr, nodeCount) + + defer func() { + relay.Lock() + delete(relay.sources, node.addr) + nodeCount := len(relay.sources) + relay.Unlock() + log.Infof("Source node %s has disconnected from relay %s. %d sources now serving this relay", node.addr, node.relayID, nodeCount) + + }() + + cm.Wait() + }() +} + +// RelayedMessage is the format with which HTTP requests are routed over the +// source nodes' WebSocket connections. +type RelayedMessage struct { + MessageID uint64 `json:"messageID"` + Method string `json:"method,omitempty"` + Body dex.Bytes `json:"body"` + Headers map[string][]string `json:"headers,omitempty"` +} + +var messageIDCounter uint64 + +// runRelayServer runs a relayNode server. This server accepts requests from +// local consumers and routes them to a waiting source node connection. +func (n *Nexus) runRelayServer(relayID string) (string, error) { + log, wg := n.cfg.Logger, &n.wg + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", fmt.Errorf("error getting nexus listener for relay ID %s: %w", relayID, err) + } + + relayAddr := l.Addr().String() + mgr := n.relays[relayID] + + // This is a request coming from a local dcrdex backend. Send it to any + // waiting sourceNode and handle the response. + handleRequest := func(w http.ResponseWriter, r *http.Request) { + // Parse the request. + b, err := io.ReadAll(r.Body) + r.Body.Close() + if err != nil { + n.log.Errorf("Error reading request for relay ID %s: %v", relayID, err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + // Format for WebSockets. + reqID := atomic.AddUint64(&messageIDCounter, 1) + reqB, err := json.Marshal(&RelayedMessage{ + MessageID: reqID, + Method: r.Method, + Body: b, + Headers: r.Header, + }) + if err != nil { + log.Errorf("Error marshaling RelayedMessage: %v", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + // Prepare a list of sources. + mgr.RLock() + nodeList := make([]*sourceNode, 0, len(mgr.sources)) + for _, n := range mgr.sources { + nodeList = append(nodeList, n) + } + mgr.RUnlock() + if len(nodeList) == 0 { + http.Error(w, fmt.Sprintf("No nodes connected for relay %s", relayID), http.StatusServiceUnavailable) + return + } + + // Randomly shuffle the list. + rand.Shuffle(len(nodeList), func(i, j int) { + nodeList[i], nodeList[j] = nodeList[j], nodeList[i] + }) + + // result is used to track the best result from the nodeList. The first + // non-error result is used to respond to the consumer. + type result struct { + body []byte + hdrs map[string][]string + err error + } + + var res *result + + out: + for i, node := range nodeList { + resultC := make(chan *result) + node.logReq(reqID, func(body []byte, hdrs map[string][]string) { + resultC <- &result{body: body, hdrs: hdrs} + }, func() { + resultC <- &result{err: fmt.Errorf("request expired")} + }) + + node.cl.SendRaw(reqB) + select { + case res = <-resultC: + if res.err == nil { + break out + } + log.Errorf("Error requesting data from %s node at %s: %v", node.relayID, node.addr, res.err) + if i < len(nodeList)-1 { + log.Infof("Trying another source node") + } + case <-n.ctx.Done(): + return + } + } + if res == nil || res.err != nil { + http.Error(w, "all source nodes errored", http.StatusTeapot) + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + for k, vs := range res.hdrs { + for _, v := range vs { + w.Header().Set(k, v) + } + } + w.WriteHeader(http.StatusOK) + if _, err = w.Write(res.body); err != nil { + log.Errorf("Write error: %v", err) + } + } + + // Start the server. + srv := &http.Server{ + Addr: relayAddr, + Handler: http.HandlerFunc(handleRequest), + ReadTimeout: connectTimeoutSeconds * time.Second, + WriteTimeout: connectTimeoutSeconds * time.Second, + } + + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.Serve(l); !errors.Is(err, http.ErrServerClosed) { + log.Errorf("listen: %s\n", err) + } + log.Infof("Nexus no longer serving relay %s: err = %v", relayID, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-n.ctx.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + log.Errorf("http.Server Shutdown errored: %v", err) + } + }() + + return relayAddr, nil +} + +// genCertPair generates a key/cert pair to the paths provided. +func genCertPair(certFile, keyFile string, hosts []string, log dex.Logger) error { + log.Infof("Generating TLS certificates...") + + org := "dcrdex nexus autogenerated cert" + validUntil := time.Now().Add(10 * 365 * 24 * time.Hour) + cert, key, err := certgen.NewTLSCertPair(elliptic.P521(), org, + validUntil, hosts) + if err != nil { + return err + } + + // Write cert and key files. + if err = os.WriteFile(certFile, cert, 0644); err != nil { + return err + } + if err = os.WriteFile(keyFile, key, 0600); err != nil { + os.Remove(certFile) + return err + } + + log.Infof("Done generating TLS certificates") + return nil +} diff --git a/server/noderelay/noderelay_test.go b/server/noderelay/noderelay_test.go new file mode 100644 index 0000000000..bd0d7e65b8 --- /dev/null +++ b/server/noderelay/noderelay_test.go @@ -0,0 +1,156 @@ +//go:build live + +package noderelay + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "decred.org/dcrdex/client/comms" + "decred.org/dcrdex/dex" +) + +func TestNexus(t *testing.T) { + netAddr, _ := net.ResolveTCPAddr("tcp", "localhost:0") + l, _ := net.ListenTCP("tcp", netAddr) + addr := l.Addr().String() + l.Close() + _, port, err := net.SplitHostPort(addr) + if err != nil { + t.Fatalf("error splitting host and port from address %q", addr) + } + + dir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("Error making temp dir: %v", err) + } + defer os.RemoveAll(dir) + + relayID := "0xabcanything_you-want" + + cfg := &NexusConfig{ + Port: port, + Dir: dir, + Key: filepath.Join(dir, "t.key"), + Cert: filepath.Join(dir, "t.cert"), + Logger: dex.StdOutLogger("T", dex.LevelDebug), + RelayIDs: []string{relayID}, + } + + n, err := NewNexus(cfg) + if err != nil { + t.Fatalf("NewNexus error: %v", err) + } + + certB, err := os.ReadFile(cfg.Cert) + if err != nil { + t.Fatalf("Error reading certificate file: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + if _, err = n.Connect(ctx); err != nil { + t.Fatalf("Start error: %v", err) + } + + time.Sleep(2 * time.Second) + + relayAddr, err := n.RelayAddr(relayID) + if err != nil { + t.Fatalf("RelayAddr error: %v", err) + } + + firstRequest, firstResponse := "firstRequest", "firstResponse" + var rawHandlerErr error + + var cl comms.WsConn + cl, err = comms.NewWsConn(&comms.WsCfg{ + URL: "wss://" + addr, + PingWait: 20 * time.Second, + Cert: certB, + ReconnectSync: func() { + cancel() + }, + ConnectEventFunc: func(s comms.ConnectionStatus) {}, + Logger: dex.StdOutLogger("CL", dex.LevelDebug), + RawHandler: func(b []byte) { + var req *RelayedMessage + if err := json.Unmarshal(b, &req); err != nil { + t.Fatalf("Error unmarshaling raw message: %v", err) + } + switch req.MessageID { + case 1: + if string(req.Body) != firstRequest { + rawHandlerErr = fmt.Errorf("first request had unexpected body %s != %s", string(req.Body), firstRequest) + cancel() + return + } + msgB, _ := json.Marshal(&RelayedMessage{ + MessageID: 1, + Body: []byte(firstResponse), + }) + if err := cl.SendRaw(msgB); err != nil { + rawHandlerErr = fmt.Errorf("Send error: %w", err) + cancel() + } + } + }, + }) + if err != nil { + t.Fatalf("NewWsConn error: %v", err) + } + + cm := dex.NewConnectionMaster(cl) + if err := cm.ConnectOnce(ctx); err != nil { + t.Fatalf("ConnectOnce error: %v", err) + } + + msgB, _ := json.Marshal(&RelayedMessage{ + MessageID: 0, // 0 is always the node ID + Body: []byte(relayID), + }) + + cl.SendRaw(msgB) + + select { + case <-n.WaitForSourceNodes(): + case <-ctx.Done(): + return + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for source nodes to connect") + } + + // Now act like an asset backend and send a request through the relay node. + relayURL := "http://" + relayAddr + resp, err := http.DefaultClient.Post(relayURL, "application/json", strings.NewReader(firstRequest)) + if err != nil { + t.Fatalf("Post error: %v", err) + } + + b, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + t.Fatalf("First response read error: %v", err) + } + respBody := strings.TrimSpace(string(b)) + if respBody != firstResponse { + t.Fatalf("wrong first response. %s != %s", string(b), firstResponse) + } + cancel() + cm.Wait() + n.wg.Wait() + + if rawHandlerErr != nil { + t.Fatalf("Error generated in node source request handling: %v", rawHandlerErr) + } + fmt.Println("!!!!! Success !!!!!") +}