Skip to content
This repository has been archived by the owner on Feb 25, 2023. It is now read-only.

Commit

Permalink
middleware: Electrum Tunnel Unit tests
Browse files Browse the repository at this point in the history
This commit adapts the unit tests to the changes introduced in the
previous commit for the electrum tunneling. The electrum client is now
only spawned with the first electrum rpc call.

The electrum client opens a new tcp connection to the electrum server
for each connected client of the middleware.
  • Loading branch information
TheCharlatan committed Aug 2, 2019
1 parent ee42260 commit e5a098d
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 45 deletions.
9 changes: 5 additions & 4 deletions middleware/src/electrum/electrum.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Package electrum reads messages from a connected electrum server over tcp and passes the read value to a callback function. It also sends messages to the electrum server over tcp
package electrum

import (
"bufio"
"fmt"
"log"
"net"

"github.com/ethereum/go-ethereum/log"
)

// Electrum makes a connection to an Electrum server and proxies messages.
Expand All @@ -28,19 +28,20 @@ func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, err
return electrum, nil
}

// read raw message from Electrum server
func (electrum *Electrum) read() {
reader := bufio.NewReader(electrum.connection)
for {
line, err := reader.ReadBytes(byte('\n'))
if err != nil {
log.Error(fmt.Sprintf("electrum read error: %v", err))
log.Println(fmt.Sprintf("electrum read error: %v", err))
break
}
electrum.onMessageReceived(line)
}
}

// Send sends a raw message to the Electrum server.
// Send a raw message to the Electrum server.
func (electrum *Electrum) Send(msg []byte) error {
_, err := electrum.connection.Write(msg)
return err
Expand Down
16 changes: 4 additions & 12 deletions middleware/src/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/electrum"
noisemanager "github.com/digitalbitbox/bitbox-base/middleware/src/noise"
"github.com/digitalbitbox/bitbox-base/middleware/src/rpcserver"

Expand Down Expand Up @@ -112,15 +111,11 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) {
onElectrumMessageReceived := func(msg []byte) {
writeChan <- append([]byte{opElectrum}, msg...)
}
electrumClient, err := electrum.NewElectrum(handlers.electrumAddress, onElectrumMessageReceived)
if err != nil {
log.Println(err.Error() + "Electrum connection failed to initialize")
return
}

server := rpcserver.NewRPCServer(
handlers.middleware,
electrumClient,
handlers.electrumAddress,
onElectrumMessageReceived,
)
go func() {
for {
Expand All @@ -129,11 +124,8 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) {
}
}()
handlers.mu.Lock()
handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan()
onMessageReceived := func(msg []byte) {
server.RPCConnection.ReadChan() <- msg
}
handlers.runWebsocket(ws, onMessageReceived, writeChan, handlers.nClients)
handlers.clientsMap[handlers.nClients] = writeChan
handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), writeChan, handlers.nClients)
handlers.nClients++
handlers.mu.Unlock()
go server.Serve()
Expand Down
1 change: 0 additions & 1 deletion middleware/src/handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestRootHandler(t *testing.T) {
}

func TestWebsocketHandler(t *testing.T) {
return
argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = "user"
argumentMap["bitcoinRPCPassword"] = "password"
Expand Down
4 changes: 2 additions & 2 deletions middleware/src/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// The goroutines close client upon exit or dues to a send/receive error.
func (handlers *Handlers) runWebsocket(
client *websocket.Conn,
onMessageReceived func([]byte),
readChan chan<- []byte,
writeChan <-chan []byte,
clientID int) {

Expand Down Expand Up @@ -63,7 +63,7 @@ func (handlers *Handlers) runWebsocket(
return
}
log.Println(string(messageDecrypted))
onMessageReceived(messageDecrypted)
readChan <- messageDecrypted
}
}

Expand Down
10 changes: 6 additions & 4 deletions middleware/src/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ const (

// Middleware connects to services on the base with provided parrameters and emits events for the handler.
type Middleware struct {
info SampleInfoResponse
environment system.Environment
events chan []byte
info SampleInfoResponse
environment system.Environment
events chan []byte
electrumEvents chan []byte
}

// NewMiddleware returns a new instance of the middleware
func NewMiddleware(argumentMap map[string]string) *Middleware {
middleware := &Middleware{
environment: system.NewEnvironment(argumentMap),
//TODO(TheCharlatan) find a better way to increase the channel size
events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint
events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint
electrumEvents: make(chan []byte),
info: SampleInfoResponse{
Blocks: 0,
Difficulty: 0.0,
Expand Down
29 changes: 21 additions & 8 deletions middleware/src/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/rpc"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/electrum"
)

type rpcConn struct {
Expand Down Expand Up @@ -55,17 +56,21 @@ type Electrum interface {

// RPCServer provides rpc calls to the middleware
type RPCServer struct {
middleware Middleware
electrum Electrum
RPCConnection *rpcConn
middleware Middleware
electrum Electrum
electrumAddress string
onElectrumMessageReceived func(msg []byte)
RPCConnection *rpcConn
}

// NewRPCServer returns a new RPCServer
func NewRPCServer(middleware Middleware, electrum Electrum) *RPCServer {
func NewRPCServer(middleware Middleware, electrumAddress string, onElectrumMessageReceived func(msg []byte)) *RPCServer { //, electrum Electrum) *RPCServer {
server := &RPCServer{
middleware: middleware,
electrum: electrum,
RPCConnection: newRPCConn(),
middleware: middleware,
//electrum: electrum,
electrumAddress: electrumAddress,
onElectrumMessageReceived: onElectrumMessageReceived,
RPCConnection: newRPCConn(),
}
err := rpc.Register(server)
if err != nil {
Expand Down Expand Up @@ -96,10 +101,18 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes
return nil
}

// ElectrumSend sends a message to Electrum on the connection owned by the client.
// ElectrumSend sends a message to the Electrum server on the connection owned by the client.
func (server *RPCServer) ElectrumSend(
args struct{ Msg []byte },
reply *struct{}) error {
if server.electrum == nil {
electrumClient, err := electrum.NewElectrum(server.electrumAddress, server.onElectrumMessageReceived)
server.electrum = electrumClient
if err != nil {
log.Println(err.Error() + "Electrum connection failed to initialize")
return err
}
}
return server.electrum.Send(args.Msg)
}

Expand Down
19 changes: 5 additions & 14 deletions middleware/src/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ func (conn *rpcConn) Close() error {
return nil
}

type electrumMock struct {
send func([]byte) error
}

func (e *electrumMock) Send(msg []byte) error {
if e.send != nil {
return e.send(msg)
}
return nil
}

func TestRPCServer(t *testing.T) {
argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = "user"
Expand All @@ -50,7 +39,7 @@ func TestRPCServer(t *testing.T) {
argumentMap["network"] = "testnet"
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"
middlewareInstance := middleware.NewMiddleware(argumentMap)
rpcServer := rpcserver.NewRPCServer(middlewareInstance, &electrumMock{})
rpcServer := rpcserver.NewRPCServer(middlewareInstance, "localhost:80801", func([]byte) {})
serverWriteChan := rpcServer.RPCConnection.WriteChan()
serverReadChan := rpcServer.RPCConnection.ReadChan()

Expand All @@ -73,9 +62,11 @@ func TestRPCServer(t *testing.T) {
msgRequest := <-clientWriteChan
serverReadChan <- msgRequest
msgResponse := <-serverWriteChan
//t.Logf("significant byte: %s", string(msgResponse[0]))
t.Logf("response message %s", string(msgResponse))
// Cut off the significant Byte in the response
clientReadChan <- msgResponse[1:]
//t.Logf("significant byte: %s", string(msgResponse[1]))
clientReadChan <- msgResponse
wg.Wait()
t.Logf("reply: %v", reply)
require.Equal(t, "testnet", reply.Network)
Expand All @@ -96,7 +87,7 @@ func TestRPCServer(t *testing.T) {
msgResponse = <-serverWriteChan
t.Logf("Resync Bitcoin Response %q", string(msgResponse))
// Cut off the significant Byte in the response
clientReadChan <- msgResponse[1:]
clientReadChan <- msgResponse
wg.Wait()
require.Equal(t, false, resyncReply.Success)
}

0 comments on commit e5a098d

Please sign in to comment.