Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baseledger p2p client #16

Open
wants to merge 15 commits into
base: dev2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions cmd/statsdaemon/log_transceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver {
var wsDialer websocket.Dialer
wsConn, _, err := wsDialer.Dial(websocketURL, nil)
if err != nil {
common.Log.Errorf("Failed to establish network logs websocket connection to %s; %s", websocketURL, err.Error())
common.Log.Errorf("failed to establish network logs websocket connection to %s; %s", websocketURL, err.Error())
} else {
defer wsConn.Close()
id, _ := uuid.NewV4()
Expand All @@ -77,9 +77,9 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver {
"jsonrpc": "2.0",
}
if err := wsConn.WriteJSON(payload); err != nil {
common.Log.Errorf("Failed to write subscribe message to network logs websocket connection")
common.Log.Errorf("failed to write subscribe message to network logs websocket connection")
} else {
common.Log.Debugf("Subscribed to network logs websocket: %s", websocketURL)
common.Log.Debugf("subscribed to network logs websocket: %s", websocketURL)

for {
_, message, err := wsConn.ReadMessage()
Expand Down Expand Up @@ -116,13 +116,13 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver {

// Consume the websocket stream; attempts to fallback to JSON-RPC if websocket stream fails or is not available for the network
func (lt *LogTransceiver) consume() error {
lt.log.Debugf("Attempting to consume configured network log transceiver; attempt #%v", lt.attempt)
lt.log.Debugf("attempting to consume configured network log transceiver; attempt #%v", lt.attempt)

var err error
if lt.Stream != nil {
err = lt.Stream(lt.queue)
} else {
err = errors.New("Configured log transceiver does not have a configured Stream impl")
err = errors.New("configured log transceiver does not have a configured Stream impl")
}

return err
Expand All @@ -131,7 +131,7 @@ func (lt *LogTransceiver) consume() error {
func (lt *LogTransceiver) ingest(logmsg []byte) {
defer func() {
if r := recover(); r != nil {
common.Log.Warningf("Recovered from failed log transceiver event ingestion attempt; %s", r)
common.Log.Warningf("recovered from failed log transceiver event ingestion attempt; %s", r)
}
}()

Expand All @@ -143,7 +143,7 @@ func (lt *LogTransceiver) ingest(logmsg []byte) {
func (lt *LogTransceiver) ingestEthereum(logmsg []byte) {
err := natsutil.NatsStreamingPublish(natsLogTransceiverEmitSubject, logmsg)
if err != nil {
common.Log.Warningf("Log transceiver failed to publish %d-byte log emission message; %s", len(logmsg), err.Error())
common.Log.Warningf("log transceiver failed to publish %d-byte log emission message; %s", len(logmsg), err.Error())
}
}

Expand All @@ -155,7 +155,7 @@ func (lt *LogTransceiver) loop() error {
lt.ingest(*evt)

case <-lt.shutdownCtx.Done():
lt.log.Debugf("Closing log transceiver on shutdown")
lt.log.Debugf("closing log transceiver on shutdown")
return nil
}
}
Expand All @@ -164,14 +164,14 @@ func (lt *LogTransceiver) loop() error {
// EvictNetworkLogTransceiver evicts a single, previously-initialized log transceiver instance {
func EvictNetworkLogTransceiver(network *network.Network) error {
if daemon, ok := currentLogTransceivers[network.ID.String()]; ok {
common.Log.Debugf("Evicting log transceiver instance for network: %s; id: %s", *network.Name, network.ID)
common.Log.Debugf("evicting log transceiver instance for network: %s; id: %s", *network.Name, network.ID)
daemon.shutdown()
currentLogTransceiversMutex.Lock()
delete(currentLogTransceivers, network.ID.String())
currentLogTransceiversMutex.Unlock()
return nil
}
return fmt.Errorf("Unable to evict log transceiver instance for network: %s; id; %s", *network.Name, network.ID)
return fmt.Errorf("unable to evict log transceiver instance for network: %s; id; %s", *network.Name, network.ID)
}

// RequireNetworkLogTransceiver ensures a single log transceiver instance is running for
Expand All @@ -180,12 +180,12 @@ func EvictNetworkLogTransceiver(network *network.Network) error {
func RequireNetworkLogTransceiver(network *network.Network) *LogTransceiver {
var daemon *LogTransceiver
if daemon, ok := currentLogTransceivers[network.ID.String()]; ok {
common.Log.Debugf("Cached log transceiver instance found for network: %s; id: %s", *network.Name, network.ID)
common.Log.Debugf("cached log transceiver instance found for network: %s; id: %s", *network.Name, network.ID)
return daemon
}

currentLogTransceiversMutex.Lock()
common.Log.Infof("Initializing new log transceiver instance for network: %s; id: %s", *network.Name, network.ID)
common.Log.Infof("initializing new log transceiver instance for network: %s; id: %s", *network.Name, network.ID)
daemon = NewNetworkLogTransceiver(common.Log, network)
if daemon != nil {
currentLogTransceivers[network.ID.String()] = daemon
Expand Down Expand Up @@ -219,10 +219,10 @@ func (lt *LogTransceiver) run() error {
go func() {
for !lt.shuttingDown() {
lt.attempt++
common.Log.Debugf("Stepping into main runloop of log transceiver instance; attempt #%v", lt.attempt)
common.Log.Debugf("stepping into main runloop of log transceiver instance; attempt #%v", lt.attempt)
err := lt.consume()
if err != nil {
common.Log.Warningf("Configured network log transceiver failed to consume network log events; %s", err.Error())
common.Log.Warningf("configured network log transceiver failed to consume network log events; %s", err.Error())
if lt.backoff == 0 {
lt.backoff = 100
} else {
Expand All @@ -240,10 +240,10 @@ func (lt *LogTransceiver) run() error {
err := lt.loop()

if err == nil {
lt.log.Info("Network log transceiver exited cleanly")
lt.log.Info("network log transceiver exited cleanly")
} else {
if !lt.shuttingDown() {
common.Log.Errorf("Forcing shutdown of log transceiver due to error; %s", err)
common.Log.Errorf("forcing shutdown of log transceiver due to error; %s", err)
lt.shutdown()
}
}
Expand All @@ -252,14 +252,14 @@ func (lt *LogTransceiver) run() error {
}

func (lt *LogTransceiver) handleSignals() {
common.Log.Debug("Installing SIGINT and SIGTERM signal handlers")
common.Log.Debug("installing SIGINT and SIGTERM signal handlers")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

go func() {
select {
case sig := <-sigs:
common.Log.Infof("Received signal: %s", sig)
common.Log.Infof("received signal: %s", sig)
lt.shutdown()
case <-lt.shutdownCtx.Done():
close(sigs)
Expand All @@ -269,7 +269,7 @@ func (lt *LogTransceiver) handleSignals() {

func (lt *LogTransceiver) shutdown() {
if atomic.AddUint32(&lt.closing, 1) == 1 {
common.Log.Debugf("Shutting down log transceiver instance for network: %s", *lt.Network.Name)
common.Log.Debugf("shutting down log transceiver instance for network: %s", *lt.Network.Name)
lt.cancelF()
}
}
Expand Down
120 changes: 119 additions & 1 deletion cmd/statsdaemon/main_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,135 @@
package main

import (
"encoding/json"
"fmt"
"testing"

uuid "github.com/kthomas/go.uuid"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/provideplatform/ident/common"
"github.com/provideplatform/nchain/network"
ident "github.com/provideplatform/provide-go/api/ident"
"github.com/provideplatform/provide-go/api/nchain"
)

func TestNChainStatsdaemon(t *testing.T) {
type chainSpecConfig struct{}

type chainSpec struct {
Config *chainSpecConfig `json:"config"`
}

type chainConfig struct {
NativeCurrency *string `json:"native_currency"`
IsBaseledgerNetwork bool `json:"is_baseledger_network"`
Client *string `json:"client"`
BlockExplorerUrl *string `json:"block_explorer_url"`
JsonRpcUrl *string `json:"json_rpc_url"`
WebsocketUrl *string `json:"websocket_url"`
Platform *string `json:"platform"`
EngineID *string `json:"engine_id"`
Chain *string `json:"chain"`
ProtocolID *string `json:"protocol_id"`
NetworkID int `json:"network_id"`
ChainSpec *chainSpec `json:"chainspec"`
}

type chainDef struct {
Name *string `json:"name"`
Enabled bool `json:"enabled"`
Config *chainConfig `json:"config"`
}

func SetupBaseledgerTestNetwork() (*network.Network, error) {
testID, _ := uuid.NewV4()

email := "prvd" + testID.String() + "@email.com"
pwd := "super_secret"
_, err := ident.CreateUser("", map[string]interface{}{
"first_name": "statsdaemon first name" + testID.String(),
"last_name": "statsdaemon last name" + testID.String(),
"email": email,
"password": pwd,
})
if err != nil {
return nil, fmt.Errorf("error creating user. Error: %s", err.Error())
}

authResponse, _ := ident.Authenticate(email, pwd)
if err != nil {
return nil, fmt.Errorf("error authenticating user. Error: %s", err.Error())
}

chainySpecConfig := chainSpecConfig{}
chainySpec := chainSpec{
Config: &chainySpecConfig,
}
chainyConfig := chainConfig{
NativeCurrency: common.StringOrNil("token"),
Platform: common.StringOrNil("tendermint"),
Client: common.StringOrNil("baseledger"),
NetworkID: 3,
IsBaseledgerNetwork: true,
Chain: common.StringOrNil("peachtree"),
WebsocketUrl: common.StringOrNil("ws://genesis.peachtree.baseledger.provide.network:1337/websocket"),
ChainSpec: &chainySpec,
}

chainName := fmt.Sprintf("Baseledger Testnet %s", testID.String())

chainyChain := chainDef{
Name: common.StringOrNil(chainName),
Config: &chainyConfig,
}

chainyChainJSON, _ := json.Marshal(chainyChain)

params := map[string]interface{}{}
json.Unmarshal(chainyChainJSON, &params)

testNetwork, err := nchain.CreateNetwork(*authResponse.Token.AccessToken, params)
if err != nil {
return nil, fmt.Errorf("error creating network. Error: %s", err.Error())
}

return &network.Network{
ApplicationID: testNetwork.ApplicationID,
UserID: testNetwork.UserID,
Name: testNetwork.Name,
Description: testNetwork.Description,
Enabled: testNetwork.Enabled,
ChainID: testNetwork.ChainID,
NetworkID: testNetwork.NetworkID,
Config: testNetwork.Config,
}, nil
}

func TestNChainBaseledgerStatsdaemon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "NChain statsdaemon Suite")
}

var _ = Describe("Main", func() {
It("Should parse one successful baseledger block header event", func() {
testNetwork, err := SetupBaseledgerTestNetwork()
if err != nil {
Fail("Failed to set up baseledger test network")
}

statsDaemon := RequireNetworkStatsDaemon(testNetwork)
// get one result and shutdown statsdaemon and check result
sampleResult := <-statsDaemon.queue
EvictNetworkStatsDaemon(testNetwork)

jsonSampleResult, _ := json.Marshal(sampleResult.Meta["last_block_header"])
formattedSampleHeaderResult := nchain.BaseledgerBlockHeaderResponse{}.Value.Header
err = json.Unmarshal(jsonSampleResult, &formattedSampleHeaderResult)
if err != nil {
Fail("Failed to unmarshall header response")
}

Expect(formattedSampleHeaderResult).NotTo(BeNil())
})
})
Loading