Skip to content

Commit

Permalink
server: add a WebSockets-based reverse tunnel (#2493)
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 authored Sep 17, 2023
1 parent bf4887e commit 9ee538d
Show file tree
Hide file tree
Showing 33 changed files with 1,645 additions and 178 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ client/asset/btc/electrum/example/wallet/wallet
client/asset/eth/cmd/getgas/getgas
client/asset/eth/cmd/deploy/deploy
client/cmd/dexc-desktop/pkg/installers
server/noderelay/cmd/sourcenode/sourcenode
153 changes: 92 additions & 61 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type WsConn interface {
NextID() uint64
IsDown() bool
Send(msg *msgjson.Message) error
SendRaw(b []byte) error
Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error
RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error
Connect(ctx context.Context) (*sync.WaitGroup, error)
Expand Down Expand Up @@ -134,6 +135,10 @@ type WsCfg struct {

// NetDialContext specifies an optional dialer context to use.
NetDialContext func(context.Context, string, string) (net.Conn, error)

// RawHandler overrides the msgjson parsing and forwards all messages to
// the provided function.
RawHandler func([]byte)
}

// wsConn represents a client websocket connection.
Expand All @@ -159,6 +164,8 @@ type wsConn struct {
reconnectCh chan struct{} // trigger for immediate reconnect
}

var _ WsConn = (*wsConn)(nil)

// NewWsConn creates a client websocket connection.
func NewWsConn(cfg *WsCfg) (WsConn, error) {
if cfg.PingWait < 0 {
Expand Down Expand Up @@ -223,6 +230,7 @@ func (conn *wsConn) connect(ctx context.Context) error {
} else {
dialer.Proxy = http.ProxyFromEnvironment
}

ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, nil)
if err != nil {
if isErrorInvalidCert(err) {
Expand Down Expand Up @@ -278,12 +286,66 @@ func (conn *wsConn) connect(ctx context.Context) error {
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.read(ctx)
if conn.cfg.RawHandler != nil {
conn.readRaw(ctx)
} else {
conn.read(ctx)
}

}()

return nil
}

func (conn *wsConn) SetReadLimit(limit int64) {
conn.wsMtx.Lock()
ws := conn.ws
conn.wsMtx.Unlock()
if ws != nil {
ws.SetReadLimit(limit)
}
}

func (conn *wsConn) handleReadError(err error) {
reconnect := func() {
conn.setConnectionStatus(Disconnected)
conn.reconnectCh <- struct{}{}
}

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
reconnect()
return
}
// TODO: Now that wsConn goroutines have contexts that are canceled
// on shutdown, we do not have to infer the source and severity of
// the error; just reconnect in ALL other cases, and remove the
// following legacy checks.

// Expected close errors (1000 and 1001) ... but if the server
// closes we still want to reconnect. (???)
if websocket.IsCloseError(err, websocket.CloseGoingAway,
websocket.CloseNormalClosure) ||
strings.Contains(err.Error(), "websocket: close sent") {
reconnect()
return
}

var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Op == "read" {
if strings.Contains(opErr.Err.Error(), "use of closed network connection") {
conn.log.Errorf("read quitting: %v", err)
reconnect()
return
}
}

// Log all other errors and trigger a reconnection.
conn.log.Errorf("read error (%v), attempting reconnection", err)
reconnect()
}

func (conn *wsConn) close() {
// Attempt to send a close message in case the connection is still live.
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bye")
Expand All @@ -293,14 +355,30 @@ func (conn *wsConn) close() {
conn.ws.Close()
}

func (conn *wsConn) readRaw(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}

// Lock since conn.ws may be set by connect.
conn.wsMtx.Lock()
ws := conn.ws
conn.wsMtx.Unlock()

// Block until a message is received or an error occurs.
_, msgBytes, err := ws.ReadMessage()
if err != nil {
conn.handleReadError(err)
return
}
conn.cfg.RawHandler(msgBytes)
}
}

// read fetches and parses incoming messages for processing. This should be
// run as a goroutine. Increment the wg before calling read.
func (conn *wsConn) read(ctx context.Context) {
reconnect := func() {
conn.setConnectionStatus(Disconnected)
conn.reconnectCh <- struct{}{}
}

for {
msg := new(msgjson.Message)

Expand All @@ -317,57 +395,22 @@ func (conn *wsConn) read(ctx context.Context) {
return
}
if err != nil {
// Read timeout should flag the connection as down asap.
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
reconnect()
return
}

var mErr *json.UnmarshalTypeError
if errors.As(err, &mErr) {
// JSON decode errors are not fatal, log and proceed.
conn.log.Errorf("json decode error: %v", mErr)
continue
}

// TODO: Now that wsConn goroutines have contexts that are canceled
// on shutdown, we do not have to infer the source and severity of
// the error; just reconnect in ALL other cases, and remove the
// following legacy checks.

// Expected close errors (1000 and 1001) ... but if the server
// closes we still want to reconnect. (???)
if websocket.IsCloseError(err, websocket.CloseGoingAway,
websocket.CloseNormalClosure) ||
strings.Contains(err.Error(), "websocket: close sent") {
reconnect()
return
}

var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Op == "read" {
if strings.Contains(opErr.Err.Error(),
"use of closed network connection") {
conn.log.Errorf("read quitting: %v", err)
reconnect()
return
}
}

// Log all other errors and trigger a reconnection.
conn.log.Errorf("read error (%v), attempting reconnection", err)
reconnect()
// Successful reconnect via connect() will start read() again.
conn.handleReadError(err)
return
}

// If the message is a response, find the handler.
if msg.Type == msgjson.Response {
handler := conn.respHandler(msg.ID)
if handler == nil {
conn.log.Errorf("unhandled response with error msg: %v", handleUnknownResponse(msg))
b, _ := json.Marshal(msg)
conn.log.Errorf("No handler found for response: %v", string(b))
continue
}
// Run handlers in a goroutine so that other messages can be
Expand Down Expand Up @@ -496,12 +539,6 @@ func (conn *wsConn) Stop() {
conn.cancel()
}

func (conn *wsConn) SendRaw(b []byte) error {
conn.wsMtx.Lock()
defer conn.wsMtx.Unlock()
return conn.ws.WriteMessage(websocket.TextMessage, b)
}

// Send pushes outgoing messages over the websocket connection. Sending of the
// message is synchronous, so a nil error guarantees that the message was
// successfully sent. A non-nil error may indicate that the connection is known
Expand All @@ -519,10 +556,14 @@ func (conn *wsConn) Send(msg *msgjson.Message) error {
conn.log.Errorf("Failed to marshal message: %v", err)
return err
}
return conn.SendRaw(b)
}

// SendRaw sends a raw byte string over the websocket connection.
func (conn *wsConn) SendRaw(b []byte) error {
conn.wsMtx.Lock()
defer conn.wsMtx.Unlock()
err = conn.ws.SetWriteDeadline(time.Now().Add(writeWait))
err := conn.ws.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
conn.log.Errorf("Send: failed to set write deadline: %v", err)
return err
Expand Down Expand Up @@ -636,13 +677,3 @@ func (conn *wsConn) respHandler(id uint64) *responseHandler {
func (conn *wsConn) MessageSource() <-chan *msgjson.Message {
return conn.readCh
}

// handleUnknownResponse extracts the error message sent for a response without
// a handler.
func handleUnknownResponse(msg *msgjson.Message) error {
resp, err := msg.Response()
if err != nil {
return err
}
return resp.Error
}
4 changes: 4 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func (conn *TWebsocket) Send(msg *msgjson.Message) error {

return conn.sendErr
}

func (conn *TWebsocket) SendRaw([]byte) error {
return conn.sendErr
}
func (conn *TWebsocket) Request(msg *msgjson.Message, f msgFunc) error {
return conn.RequestWithTimeout(msg, f, 0, func() {})
}
Expand Down
64 changes: 60 additions & 4 deletions dex/testing/dcrdex/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,29 @@ EOF
else echo "WARNING: Dash is not running. Configuring dcrdex markets without DASH."
fi

# run with NODERELAY=1 to use a node relay for the bitcoin node.
BTC_NODERELAY_ID=""
DCR_NODERELAY_ID=""
BTC_CONFIG_PATH="${TEST_ROOT}/btc/alpha/alpha.conf"
DCR_CONFIG_PATH="${TEST_ROOT}/dcr/alpha/dcrd.conf"
if [[ -n ${NODERELAY} ]]; then
BTC_NODERELAY_ID="btc_a21afba3"
DCR_NODERELAY_ID="dcr_a21afba3"
RELAY_CONF_PATH="${TEST_ROOT}/btc/alpha/alpha_noderelay.conf"
if [ ! -f "${RELAY_CONF_PATH}" ]; then
cp "${BTC_CONFIG_PATH}" "${RELAY_CONF_PATH}"
echo "rpcbind=noderelay:${BTC_NODERELAY_ID}" >> "${RELAY_CONF_PATH}"
fi
BTC_CONFIG_PATH="${RELAY_CONF_PATH}"

RELAY_CONF_PATH="${TEST_ROOT}/dcr/alpha/dcrd_noderelay.conf"
if [ ! -f "${RELAY_CONF_PATH}" ]; then
cp "${DCR_CONFIG_PATH}" "${RELAY_CONF_PATH}"
echo "rpclisten=noderelay:${DCR_NODERELAY_ID}" >> "${RELAY_CONF_PATH}"
fi
DCR_CONFIG_PATH="${RELAY_CONF_PATH}"
fi

cat << EOF >> "./markets.json"
}
],
Expand All @@ -256,24 +279,26 @@ cat << EOF >> "./markets.json"
"network": "simnet",
"maxFeeRate": 10,
"swapConf": 1,
"configPath": "${TEST_ROOT}/dcr/alpha/dcrd.conf",
"configPath": "${DCR_CONFIG_PATH}",
"regConfs": 1,
"regFee": 100000000,
"regXPub": "spubVWKGn9TGzyo7M4b5xubB5UV4joZ5HBMNBmMyGvYEaoZMkSxVG4opckpmQ26E85iHg8KQxrSVTdex56biddqtXBerG9xMN8Dvb3eNQVFFwpE",
"bondAmt": 1000000000,
"bondConfs": 1
"bondConfs": 1,
"nodeRelayID": "${DCR_NODERELAY_ID}"
},
"BTC_simnet": {
"bip44symbol": "btc",
"network": "simnet",
"maxFeeRate": 100,
"swapConf": 1,
"configPath": "${TEST_ROOT}/btc/alpha/alpha.conf",
"configPath": "${BTC_CONFIG_PATH}",
"regConfs": 2,
"regFee": 20000000,
"regXPub": "vpub5SLqN2bLY4WeZJ9SmNJHsyzqVKreTXD4ZnPC22MugDNcjhKX5xNX9QiQWcE4SSRzVWyHWUihpKRT7hckDGNzVc69wSX2JPcfGeNiT5c2XZy",
"bondAmt": 10000000,
"bondConfs": 1
"bondConfs": 1,
"nodeRelayID": "${BTC_NODERELAY_ID}"
EOF

Expand Down Expand Up @@ -452,6 +477,7 @@ maxepochcancels=128
inittakerlotlimit=40
abstakerlotlimit=1200
httpprof=1
noderelayaddr=127.0.0.1:17539
EOF

# Set the postgres user pass if provided.
Expand Down Expand Up @@ -508,6 +534,10 @@ export SHELL=$(which bash)
cat > "${DCRDEX_DATA_DIR}/quit" <<EOF
#!/usr/bin/env bash
tmux send-keys -t $SESSION:0 C-c
if [ -n "${NODERELAY}" ] ; then
tmux send-keys -t $SESSION:1 C-c
tmux wait-for donenoderelay
fi
tmux wait-for donedex
tmux kill-session -t $SESSION
EOF
Expand All @@ -523,5 +553,31 @@ chmod +x "${DCRDEX_DATA_DIR}/run"
echo "Starting dcrdex"
tmux new-session -d -s $SESSION $SHELL
tmux rename-window -t $SESSION:0 'dcrdex'

if [ -n "${NODERELAY}" ]; then
SOURCENODE_DIR=$(realpath "${HARNESS_DIR}/../../../server/noderelay/cmd/sourcenode/")
cd ${SOURCENODE_DIR}
go build -o ${DCRDEX_DATA_DIR}/sourcenode
cd "${DCRDEX_DATA_DIR}"

RPC_PORT="20556"
RELAYFILE="${DCRDEX_DATA_DIR}/data/simnet/noderelay/relay-files/${BTC_NODERELAY_ID}.relayfile"

tmux new-window -t $SESSION:1 -n 'sourcenode_btc' $SHELL
# dcrdex needs to write the relayfiles.
tmux send-keys -t $SESSION:1 "sleep 4" C-m
tmux send-keys -t $SESSION:1 "./sourcenode --port ${RPC_PORT} --relayfile ${RELAYFILE}; tmux wait-for -S donenoderelay" C-m

# Decred
RPC_PORT="19561"
RELAYFILE="${DCRDEX_DATA_DIR}/data/simnet/noderelay/relay-files/${DCR_NODERELAY_ID}.relayfile"
DCRD_CERT="${TEST_ROOT}/dcr/alpha/rpc.cert"

tmux new-window -t $SESSION:2 -n 'sourcenode_dcr' $SHELL
tmux send-keys -t $SESSION:2 "sleep 4" C-m
tmux send-keys -t $SESSION:2 "./sourcenode --port ${RPC_PORT} --relayfile ${RELAYFILE} --localcert ${DCRD_CERT}; tmux wait-for -S donenoderelay" C-m
fi

tmux send-keys -t $SESSION:0 "${DCRDEX_DATA_DIR}/run" C-m
tmux select-window -t $SESSION:0
tmux attach-session -t $SESSION
Loading

0 comments on commit 9ee538d

Please sign in to comment.