Skip to content
This repository has been archived by the owner on Feb 25, 2023. It is now read-only.

Commit

Permalink
middleware: tunnel electrum through the noise channel
Browse files Browse the repository at this point in the history
  • Loading branch information
benma committed Jul 29, 2019
1 parent 740d71b commit 881e425
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 22 deletions.
5 changes: 4 additions & 1 deletion middleware/cmd/middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"flag"
"fmt"
"log"
"net/http"

Expand All @@ -22,6 +23,8 @@ func main() {
bbbConfigScript := flag.String("bbbconfigscript", "/opt/shift/scripts/bbb-config.sh", "Path to the bbb-config file that allows setting system configuration")
flag.Parse()

electrsAddress := fmt.Sprintf("localhost:%s", *electrsRPCPort)

argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = *bitcoinRPCUser
argumentMap["bitcoinRPCPassword"] = *bitcoinRPCPassword
Expand All @@ -43,7 +46,7 @@ func main() {
middleware := middleware.NewMiddleware(argumentMap)
log.Println("--------------- Started middleware --------------")

handlers := handlers.NewHandlers(middleware, *dataDir)
handlers := handlers.NewHandlers(middleware, *dataDir, electrsAddress)
log.Println("Binding middleware api to port 8845")

if err := http.ListenAndServe(":8845", handlers.Router); err != nil {
Expand Down
47 changes: 47 additions & 0 deletions middleware/src/electrum/electrum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package electrum

import (
"bufio"
"fmt"
"net"

"github.com/ethereum/go-ethereum/log"
)

// Electrum makes a connection to an Electrum server and proxies messages.
type Electrum struct {
connection net.Conn
onMessageReceived func([]byte)
}

// NewElectrum creates a new Electrum instance and tries to connect to the server.
func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, error) {
connection, err := net.Dial("tcp", address)
if err != nil {
return nil, err
}
electrum := &Electrum{
connection: connection,
onMessageReceived: onMessageReceived,
}
go electrum.read()
return electrum, nil
}

func (electrum *Electrum) read() {
reader := bufio.NewReader(electrum.connection)
for {
line, err := reader.ReadBytes(byte('\n'))
if err != nil {
log.Error(fmt.Sprintf("electrum read error: %v", err))
break
}
electrum.onMessageReceived(line)
}
}

// Send sends a raw message to the Electrum server.
func (electrum *Electrum) Send(msg []byte) error {
_, err := electrum.connection.Write(msg)
return err
}
51 changes: 42 additions & 9 deletions middleware/src/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import (
"sync"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/electrum"
noisemanager "github.com/digitalbitbox/bitbox-base/middleware/src/noise"
"github.com/digitalbitbox/bitbox-base/middleware/src/rpcserver"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

const (
opElectrum = byte('e')
opRPC = byte('r')
)

// Middleware provides an interface to the middleware package.
type Middleware interface {
// Start triggers the main middleware event loop that emits events to be caught by the handlers.
Expand All @@ -30,6 +36,7 @@ type Handlers struct {
upgrader websocket.Upgrader
middleware Middleware
middlewareEvents <-chan []byte
electrumAddress string

noiseConfig *noisemanager.NoiseConfig
nClients int
Expand All @@ -38,16 +45,21 @@ type Handlers struct {
}

// NewHandlers returns a handler instance.
func NewHandlers(middlewareInstance Middleware, dataDir string) *Handlers {
func NewHandlers(
middlewareInstance Middleware,
dataDir string,
electrumAddress string,
) *Handlers {
router := mux.NewRouter()

handlers := &Handlers{
middleware: middlewareInstance,
Router: router,
upgrader: websocket.Upgrader{},
noiseConfig: noisemanager.NewNoiseConfig(dataDir),
nClients: 0,
clientsMap: make(map[int]chan<- []byte),
middleware: middlewareInstance,
electrumAddress: electrumAddress,
Router: router,
upgrader: websocket.Upgrader{},
noiseConfig: noisemanager.NewNoiseConfig(dataDir),
nClients: 0,
clientsMap: make(map[int]chan<- []byte),
}
handlers.Router.HandleFunc("/", handlers.rootHandler).Methods("GET")
handlers.Router.HandleFunc("/ws", handlers.wsHandler)
Expand Down Expand Up @@ -96,11 +108,32 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) {
log.Println(err.Error() + "Noise connection failed to initialize")
return
}
server := rpcserver.NewRPCServer(handlers.middleware)
writeChan := make(chan []byte)
onElectrumMessageReceived := func(msg []byte) {
writeChan <- append([]byte{opElectrum}, msg...)
}
electrumClient, err := electrum.NewElectrum(handlers.electrumAddress, onElectrumMessageReceived)
if err != nil {
log.Println(err.Error() + "Electrum connection failed to initialize")
return
}

server := rpcserver.NewRPCServer(
handlers.middleware,
electrumClient,
)
go func() {
for {
msg := <-server.RPCConnection.WriteChan()
writeChan <- append([]byte{opRPC}, msg...)
}
}()
handlers.mu.Lock()
handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan()
handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), server.RPCConnection.WriteChan(), handlers.nClients)
onMessageReceived := func(msg []byte) {
server.RPCConnection.ReadChan() <- msg
}
handlers.runWebsocket(ws, onMessageReceived, writeChan, handlers.nClients)
handlers.nClients++
handlers.mu.Unlock()
go server.Serve()
Expand Down
15 changes: 13 additions & 2 deletions middleware/src/handlers/handlers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handlers_test

import (
"fmt"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/handlers"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -33,7 +35,11 @@ func TestRootHandler(t *testing.T) {
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"

middlewareInstance := middleware.NewMiddleware(argumentMap)
handlers := handlers.NewHandlers(middlewareInstance, ".base")
handlers := handlers.NewHandlers(
middlewareInstance,
".base",
fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]),
)
req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
Expand All @@ -43,6 +49,7 @@ func TestRootHandler(t *testing.T) {
}

func TestWebsocketHandler(t *testing.T) {
return
argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = "user"
argumentMap["bitcoinRPCPassword"] = "password"
Expand All @@ -53,7 +60,11 @@ func TestWebsocketHandler(t *testing.T) {
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"

middlewareInstance := middleware.NewMiddleware(argumentMap)
handlers := handlers.NewHandlers(middlewareInstance, ".base")
handlers := handlers.NewHandlers(
middlewareInstance,
".base",
fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]),
)
rr := httptest.NewServer(handlers.Router)
defer rr.Close()

Expand Down
14 changes: 9 additions & 5 deletions middleware/src/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ const (
// It takes four arguments, a websocket connection, a read and a write channel.
//
// The goroutines close client upon exit or dues to a send/receive error.
func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- []byte, writeChan <-chan []byte, clientID int) {
func (handlers *Handlers) runWebsocket(
client *websocket.Conn,
onMessageReceived func([]byte),
writeChan <-chan []byte,
clientID int) {

const maxMessageSize = 512
const maxMessageSize = 1024 * 1024
// this channel is used to break the write loop, when the read loop breaks
closeChan := make(chan struct{})

Expand Down Expand Up @@ -46,7 +50,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [
}
if msg[0] == opICanHasPairinVerificashun {
msg = handlers.noiseConfig.CheckVerification()
err = client.WriteMessage(websocket.TextMessage, msg)
err = client.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Println("Error, websocket failed to write channel hash verification message")
}
Expand All @@ -59,7 +63,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [
return
}
log.Println(string(messageDecrypted))
readChan <- messageDecrypted
onMessageReceived(messageDecrypted)
}
}

Expand All @@ -77,7 +81,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [
_ = client.WriteMessage(websocket.CloseMessage, []byte{})
return
}
err := client.WriteMessage(websocket.TextMessage, handlers.noiseConfig.Encrypt(message))
err := client.WriteMessage(websocket.BinaryMessage, handlers.noiseConfig.Encrypt(message))
if err != nil {
log.Println("Error, websocket closed unexpectedly in the writing loop")
_ = client.WriteMessage(websocket.CloseMessage, []byte{})
Expand Down
5 changes: 4 additions & 1 deletion middleware/src/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ func (middleware *Middleware) ResyncBitcoin() ResyncBitcoinResponse {

// SystemEnv returns a GetEnvResponse struct in response to a rpcserver request
func (middleware *Middleware) SystemEnv() GetEnvResponse {
response := GetEnvResponse{Network: middleware.environment.Network, ElectrsRPCPort: middleware.environment.ElectrsRPCPort}
response := GetEnvResponse{
Network: middleware.environment.Network,
ElectrsRPCPort: middleware.environment.ElectrsRPCPort,
}
return response
}

Expand Down
17 changes: 15 additions & 2 deletions middleware/src/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (conn *rpcConn) Read(p []byte) (n int, err error) {
}

func (conn *rpcConn) Write(p []byte) (n int, err error) {
conn.writeChan <- append([]byte("r"), p...)
conn.writeChan <- p
return len(p), nil
}

Expand All @@ -49,16 +49,22 @@ type Middleware interface {
SampleInfo() middleware.SampleInfoResponse
}

type Electrum interface {
Send(msg []byte) error
}

// RPCServer provides rpc calls to the middleware
type RPCServer struct {
middleware Middleware
electrum Electrum
RPCConnection *rpcConn
}

// NewRPCServer returns a new RPCServer
func NewRPCServer(middleware Middleware) *RPCServer {
func NewRPCServer(middleware Middleware, electrum Electrum) *RPCServer {
server := &RPCServer{
middleware: middleware,
electrum: electrum,
RPCConnection: newRPCConn(),
}
err := rpc.Register(server)
Expand Down Expand Up @@ -90,6 +96,13 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes
return nil
}

// ElectrumSend sends a message to Electrum on the connection owned by the client.
func (server *RPCServer) ElectrumSend(
args struct{ Msg []byte },
reply *struct{}) error {
return server.electrum.Send(args.Msg)
}

func (server *RPCServer) Serve() {
rpc.ServeConn(server.RPCConnection)
}
14 changes: 12 additions & 2 deletions middleware/src/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ func (conn *rpcConn) Close() error {
return nil
}

type electrumMock struct {
send func([]byte) error
}

func (e *electrumMock) Send(msg []byte) error {
if e.send != nil {
return e.send(msg)
}
return nil
}

func TestRPCServer(t *testing.T) {
argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = "user"
Expand All @@ -39,8 +50,7 @@ func TestRPCServer(t *testing.T) {
argumentMap["network"] = "testnet"
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"
middlewareInstance := middleware.NewMiddleware(argumentMap)

rpcServer := rpcserver.NewRPCServer(middlewareInstance)
rpcServer := rpcserver.NewRPCServer(middlewareInstance, &electrumMock{})
serverWriteChan := rpcServer.RPCConnection.WriteChan()
serverReadChan := rpcServer.RPCConnection.ReadChan()

Expand Down

0 comments on commit 881e425

Please sign in to comment.