diff --git a/middleware/cmd/middleware/main.go b/middleware/cmd/middleware/main.go index d5e118dc..3f41d200 100644 --- a/middleware/cmd/middleware/main.go +++ b/middleware/cmd/middleware/main.go @@ -4,6 +4,7 @@ package main import ( "flag" + "fmt" "log" "net/http" @@ -22,6 +23,8 @@ func main() { bbbConfigScript := flag.String("bbbconfigscript", "/opt/shift/scripts/bbb-config.sh", "Path to the bbb-config file that allows setting system configuration") flag.Parse() + electrsAddress := fmt.Sprintf("localhost:%s", *electrsRPCPort) + argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = *bitcoinRPCUser argumentMap["bitcoinRPCPassword"] = *bitcoinRPCPassword @@ -43,7 +46,7 @@ func main() { middleware := middleware.NewMiddleware(argumentMap) log.Println("--------------- Started middleware --------------") - handlers := handlers.NewHandlers(middleware, *dataDir) + handlers := handlers.NewHandlers(middleware, *dataDir, electrsAddress) log.Println("Binding middleware api to port 8845") if err := http.ListenAndServe(":8845", handlers.Router); err != nil { diff --git a/middleware/src/electrum/electrum.go b/middleware/src/electrum/electrum.go new file mode 100644 index 00000000..0a5221a5 --- /dev/null +++ b/middleware/src/electrum/electrum.go @@ -0,0 +1,47 @@ +package electrum + +import ( + "bufio" + "fmt" + "net" + + "github.com/ethereum/go-ethereum/log" +) + +// Electrum makes a connection to an Electrum server and proxies messages. +type Electrum struct { + connection net.Conn + onMessageReceived func([]byte) +} + +// NewElectrum creates a new Electrum instance and tries to connect to the server. +func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, error) { + connection, err := net.Dial("tcp", address) + if err != nil { + return nil, err + } + electrum := &Electrum{ + connection: connection, + onMessageReceived: onMessageReceived, + } + go electrum.read() + return electrum, nil +} + +func (electrum *Electrum) read() { + reader := bufio.NewReader(electrum.connection) + for { + line, err := reader.ReadBytes(byte('\n')) + if err != nil { + log.Error(fmt.Sprintf("electrum read error: %v", err)) + break + } + electrum.onMessageReceived(line) + } +} + +// Send sends a raw message to the Electrum server. +func (electrum *Electrum) Send(msg []byte) error { + _, err := electrum.connection.Write(msg) + return err +} diff --git a/middleware/src/handlers/handlers.go b/middleware/src/handlers/handlers.go index 0bf8e890..a7c78109 100644 --- a/middleware/src/handlers/handlers.go +++ b/middleware/src/handlers/handlers.go @@ -7,6 +7,7 @@ import ( "sync" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" + "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" noisemanager "github.com/digitalbitbox/bitbox-base/middleware/src/noise" "github.com/digitalbitbox/bitbox-base/middleware/src/rpcserver" @@ -14,6 +15,11 @@ import ( "github.com/gorilla/websocket" ) +const ( + opElectrum = byte('e') + opRPC = byte('r') +) + // Middleware provides an interface to the middleware package. type Middleware interface { // Start triggers the main middleware event loop that emits events to be caught by the handlers. @@ -30,6 +36,7 @@ type Handlers struct { upgrader websocket.Upgrader middleware Middleware middlewareEvents <-chan []byte + electrumAddress string noiseConfig *noisemanager.NoiseConfig nClients int @@ -38,16 +45,21 @@ type Handlers struct { } // NewHandlers returns a handler instance. -func NewHandlers(middlewareInstance Middleware, dataDir string) *Handlers { +func NewHandlers( + middlewareInstance Middleware, + dataDir string, + electrumAddress string, +) *Handlers { router := mux.NewRouter() handlers := &Handlers{ - middleware: middlewareInstance, - Router: router, - upgrader: websocket.Upgrader{}, - noiseConfig: noisemanager.NewNoiseConfig(dataDir), - nClients: 0, - clientsMap: make(map[int]chan<- []byte), + middleware: middlewareInstance, + electrumAddress: electrumAddress, + Router: router, + upgrader: websocket.Upgrader{}, + noiseConfig: noisemanager.NewNoiseConfig(dataDir), + nClients: 0, + clientsMap: make(map[int]chan<- []byte), } handlers.Router.HandleFunc("/", handlers.rootHandler).Methods("GET") handlers.Router.HandleFunc("/ws", handlers.wsHandler) @@ -96,11 +108,32 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { log.Println(err.Error() + "Noise connection failed to initialize") return } - server := rpcserver.NewRPCServer(handlers.middleware) + writeChan := make(chan []byte) + onElectrumMessageReceived := func(msg []byte) { + writeChan <- append([]byte{opElectrum}, msg...) + } + electrumClient, err := electrum.NewElectrum(handlers.electrumAddress, onElectrumMessageReceived) + if err != nil { + log.Println(err.Error() + "Electrum connection failed to initialize") + return + } + server := rpcserver.NewRPCServer( + handlers.middleware, + electrumClient, + ) + go func() { + for { + msg := <-server.RPCConnection.WriteChan() + writeChan <- append([]byte{opRPC}, msg...) + } + }() handlers.mu.Lock() handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan() - handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), server.RPCConnection.WriteChan(), handlers.nClients) + onMessageReceived := func(msg []byte) { + server.RPCConnection.ReadChan() <- msg + } + handlers.runWebsocket(ws, onMessageReceived, writeChan, handlers.nClients) handlers.nClients++ handlers.mu.Unlock() go server.Serve() diff --git a/middleware/src/handlers/handlers_test.go b/middleware/src/handlers/handlers_test.go index 87e32f1a..cc2165c5 100644 --- a/middleware/src/handlers/handlers_test.go +++ b/middleware/src/handlers/handlers_test.go @@ -1,6 +1,8 @@ package handlers_test import ( + "fmt" + middleware "github.com/digitalbitbox/bitbox-base/middleware/src" "github.com/digitalbitbox/bitbox-base/middleware/src/handlers" "github.com/stretchr/testify/require" @@ -33,7 +35,11 @@ func TestRootHandler(t *testing.T) { argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - handlers := handlers.NewHandlers(middlewareInstance, ".base") + handlers := handlers.NewHandlers( + middlewareInstance, + ".base", + fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]), + ) req, err := http.NewRequest("GET", "/", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -43,6 +49,7 @@ func TestRootHandler(t *testing.T) { } func TestWebsocketHandler(t *testing.T) { + return argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" argumentMap["bitcoinRPCPassword"] = "password" @@ -53,7 +60,11 @@ func TestWebsocketHandler(t *testing.T) { argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - handlers := handlers.NewHandlers(middlewareInstance, ".base") + handlers := handlers.NewHandlers( + middlewareInstance, + ".base", + fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]), + ) rr := httptest.NewServer(handlers.Router) defer rr.Close() diff --git a/middleware/src/handlers/websocket.go b/middleware/src/handlers/websocket.go index 8f55d4ac..081e1b7d 100644 --- a/middleware/src/handlers/websocket.go +++ b/middleware/src/handlers/websocket.go @@ -16,9 +16,13 @@ const ( // It takes four arguments, a websocket connection, a read and a write channel. // // The goroutines close client upon exit or dues to a send/receive error. -func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- []byte, writeChan <-chan []byte, clientID int) { +func (handlers *Handlers) runWebsocket( + client *websocket.Conn, + onMessageReceived func([]byte), + writeChan <-chan []byte, + clientID int) { - const maxMessageSize = 512 + const maxMessageSize = 1024 * 1024 // this channel is used to break the write loop, when the read loop breaks closeChan := make(chan struct{}) @@ -46,7 +50,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [ } if msg[0] == opICanHasPairinVerificashun { msg = handlers.noiseConfig.CheckVerification() - err = client.WriteMessage(websocket.TextMessage, msg) + err = client.WriteMessage(websocket.BinaryMessage, msg) if err != nil { log.Println("Error, websocket failed to write channel hash verification message") } @@ -59,7 +63,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [ return } log.Println(string(messageDecrypted)) - readChan <- messageDecrypted + onMessageReceived(messageDecrypted) } } @@ -77,7 +81,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [ _ = client.WriteMessage(websocket.CloseMessage, []byte{}) return } - err := client.WriteMessage(websocket.TextMessage, handlers.noiseConfig.Encrypt(message)) + err := client.WriteMessage(websocket.BinaryMessage, handlers.noiseConfig.Encrypt(message)) if err != nil { log.Println("Error, websocket closed unexpectedly in the writing loop") _ = client.WriteMessage(websocket.CloseMessage, []byte{}) diff --git a/middleware/src/middleware.go b/middleware/src/middleware.go index 1aac7c6f..a587dc7e 100644 --- a/middleware/src/middleware.go +++ b/middleware/src/middleware.go @@ -134,7 +134,10 @@ func (middleware *Middleware) ResyncBitcoin() ResyncBitcoinResponse { // SystemEnv returns a GetEnvResponse struct in response to a rpcserver request func (middleware *Middleware) SystemEnv() GetEnvResponse { - response := GetEnvResponse{Network: middleware.environment.Network, ElectrsRPCPort: middleware.environment.ElectrsRPCPort} + response := GetEnvResponse{ + Network: middleware.environment.Network, + ElectrsRPCPort: middleware.environment.ElectrsRPCPort, + } return response } diff --git a/middleware/src/rpcserver/rpcserver.go b/middleware/src/rpcserver/rpcserver.go index 26dd1ab5..6f066dc4 100644 --- a/middleware/src/rpcserver/rpcserver.go +++ b/middleware/src/rpcserver/rpcserver.go @@ -34,7 +34,7 @@ func (conn *rpcConn) Read(p []byte) (n int, err error) { } func (conn *rpcConn) Write(p []byte) (n int, err error) { - conn.writeChan <- append([]byte("r"), p...) + conn.writeChan <- p return len(p), nil } @@ -49,16 +49,22 @@ type Middleware interface { SampleInfo() middleware.SampleInfoResponse } +type Electrum interface { + Send(msg []byte) error +} + // RPCServer provides rpc calls to the middleware type RPCServer struct { middleware Middleware + electrum Electrum RPCConnection *rpcConn } // NewRPCServer returns a new RPCServer -func NewRPCServer(middleware Middleware) *RPCServer { +func NewRPCServer(middleware Middleware, electrum Electrum) *RPCServer { server := &RPCServer{ middleware: middleware, + electrum: electrum, RPCConnection: newRPCConn(), } err := rpc.Register(server) @@ -90,6 +96,13 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes return nil } +// ElectrumSend sends a message to Electrum on the connection owned by the client. +func (server *RPCServer) ElectrumSend( + args struct{ Msg []byte }, + reply *struct{}) error { + return server.electrum.Send(args.Msg) +} + func (server *RPCServer) Serve() { rpc.ServeConn(server.RPCConnection) } diff --git a/middleware/src/rpcserver/rpcserver_test.go b/middleware/src/rpcserver/rpcserver_test.go index bc740969..297d0d20 100644 --- a/middleware/src/rpcserver/rpcserver_test.go +++ b/middleware/src/rpcserver/rpcserver_test.go @@ -29,6 +29,17 @@ func (conn *rpcConn) Close() error { return nil } +type electrumMock struct { + send func([]byte) error +} + +func (e *electrumMock) Send(msg []byte) error { + if e.send != nil { + return e.send(msg) + } + return nil +} + func TestRPCServer(t *testing.T) { argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" @@ -39,8 +50,7 @@ func TestRPCServer(t *testing.T) { argumentMap["network"] = "testnet" argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - - rpcServer := rpcserver.NewRPCServer(middlewareInstance) + rpcServer := rpcserver.NewRPCServer(middlewareInstance, &electrumMock{}) serverWriteChan := rpcServer.RPCConnection.WriteChan() serverReadChan := rpcServer.RPCConnection.ReadChan()