diff --git a/cmd/statsdaemon/log_transceiver.go b/cmd/statsdaemon/log_transceiver.go index dee6eed4..423f2dba 100644 --- a/cmd/statsdaemon/log_transceiver.go +++ b/cmd/statsdaemon/log_transceiver.go @@ -65,7 +65,7 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver { var wsDialer websocket.Dialer wsConn, _, err := wsDialer.Dial(websocketURL, nil) if err != nil { - common.Log.Errorf("Failed to establish network logs websocket connection to %s; %s", websocketURL, err.Error()) + common.Log.Errorf("failed to establish network logs websocket connection to %s; %s", websocketURL, err.Error()) } else { defer wsConn.Close() id, _ := uuid.NewV4() @@ -77,9 +77,9 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver { "jsonrpc": "2.0", } if err := wsConn.WriteJSON(payload); err != nil { - common.Log.Errorf("Failed to write subscribe message to network logs websocket connection") + common.Log.Errorf("failed to write subscribe message to network logs websocket connection") } else { - common.Log.Debugf("Subscribed to network logs websocket: %s", websocketURL) + common.Log.Debugf("subscribed to network logs websocket: %s", websocketURL) for { _, message, err := wsConn.ReadMessage() @@ -116,13 +116,13 @@ func EthereumLogTransceiverFactory(network *network.Network) *LogTransceiver { // Consume the websocket stream; attempts to fallback to JSON-RPC if websocket stream fails or is not available for the network func (lt *LogTransceiver) consume() error { - lt.log.Debugf("Attempting to consume configured network log transceiver; attempt #%v", lt.attempt) + lt.log.Debugf("attempting to consume configured network log transceiver; attempt #%v", lt.attempt) var err error if lt.Stream != nil { err = lt.Stream(lt.queue) } else { - err = errors.New("Configured log transceiver does not have a configured Stream impl") + err = errors.New("configured log transceiver does not have a configured Stream impl") } return err @@ -131,7 +131,7 @@ func (lt *LogTransceiver) consume() error { func (lt *LogTransceiver) ingest(logmsg []byte) { defer func() { if r := recover(); r != nil { - common.Log.Warningf("Recovered from failed log transceiver event ingestion attempt; %s", r) + common.Log.Warningf("recovered from failed log transceiver event ingestion attempt; %s", r) } }() @@ -143,7 +143,7 @@ func (lt *LogTransceiver) ingest(logmsg []byte) { func (lt *LogTransceiver) ingestEthereum(logmsg []byte) { err := natsutil.NatsStreamingPublish(natsLogTransceiverEmitSubject, logmsg) if err != nil { - common.Log.Warningf("Log transceiver failed to publish %d-byte log emission message; %s", len(logmsg), err.Error()) + common.Log.Warningf("log transceiver failed to publish %d-byte log emission message; %s", len(logmsg), err.Error()) } } @@ -155,7 +155,7 @@ func (lt *LogTransceiver) loop() error { lt.ingest(*evt) case <-lt.shutdownCtx.Done(): - lt.log.Debugf("Closing log transceiver on shutdown") + lt.log.Debugf("closing log transceiver on shutdown") return nil } } @@ -164,14 +164,14 @@ func (lt *LogTransceiver) loop() error { // EvictNetworkLogTransceiver evicts a single, previously-initialized log transceiver instance { func EvictNetworkLogTransceiver(network *network.Network) error { if daemon, ok := currentLogTransceivers[network.ID.String()]; ok { - common.Log.Debugf("Evicting log transceiver instance for network: %s; id: %s", *network.Name, network.ID) + common.Log.Debugf("evicting log transceiver instance for network: %s; id: %s", *network.Name, network.ID) daemon.shutdown() currentLogTransceiversMutex.Lock() delete(currentLogTransceivers, network.ID.String()) currentLogTransceiversMutex.Unlock() return nil } - return fmt.Errorf("Unable to evict log transceiver instance for network: %s; id; %s", *network.Name, network.ID) + return fmt.Errorf("unable to evict log transceiver instance for network: %s; id; %s", *network.Name, network.ID) } // RequireNetworkLogTransceiver ensures a single log transceiver instance is running for @@ -180,12 +180,12 @@ func EvictNetworkLogTransceiver(network *network.Network) error { func RequireNetworkLogTransceiver(network *network.Network) *LogTransceiver { var daemon *LogTransceiver if daemon, ok := currentLogTransceivers[network.ID.String()]; ok { - common.Log.Debugf("Cached log transceiver instance found for network: %s; id: %s", *network.Name, network.ID) + common.Log.Debugf("cached log transceiver instance found for network: %s; id: %s", *network.Name, network.ID) return daemon } currentLogTransceiversMutex.Lock() - common.Log.Infof("Initializing new log transceiver instance for network: %s; id: %s", *network.Name, network.ID) + common.Log.Infof("initializing new log transceiver instance for network: %s; id: %s", *network.Name, network.ID) daemon = NewNetworkLogTransceiver(common.Log, network) if daemon != nil { currentLogTransceivers[network.ID.String()] = daemon @@ -219,10 +219,10 @@ func (lt *LogTransceiver) run() error { go func() { for !lt.shuttingDown() { lt.attempt++ - common.Log.Debugf("Stepping into main runloop of log transceiver instance; attempt #%v", lt.attempt) + common.Log.Debugf("stepping into main runloop of log transceiver instance; attempt #%v", lt.attempt) err := lt.consume() if err != nil { - common.Log.Warningf("Configured network log transceiver failed to consume network log events; %s", err.Error()) + common.Log.Warningf("configured network log transceiver failed to consume network log events; %s", err.Error()) if lt.backoff == 0 { lt.backoff = 100 } else { @@ -240,10 +240,10 @@ func (lt *LogTransceiver) run() error { err := lt.loop() if err == nil { - lt.log.Info("Network log transceiver exited cleanly") + lt.log.Info("network log transceiver exited cleanly") } else { if !lt.shuttingDown() { - common.Log.Errorf("Forcing shutdown of log transceiver due to error; %s", err) + common.Log.Errorf("forcing shutdown of log transceiver due to error; %s", err) lt.shutdown() } } @@ -252,14 +252,14 @@ func (lt *LogTransceiver) run() error { } func (lt *LogTransceiver) handleSignals() { - common.Log.Debug("Installing SIGINT and SIGTERM signal handlers") + common.Log.Debug("installing SIGINT and SIGTERM signal handlers") sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { select { case sig := <-sigs: - common.Log.Infof("Received signal: %s", sig) + common.Log.Infof("received signal: %s", sig) lt.shutdown() case <-lt.shutdownCtx.Done(): close(sigs) @@ -269,7 +269,7 @@ func (lt *LogTransceiver) handleSignals() { func (lt *LogTransceiver) shutdown() { if atomic.AddUint32(<.closing, 1) == 1 { - common.Log.Debugf("Shutting down log transceiver instance for network: %s", *lt.Network.Name) + common.Log.Debugf("shutting down log transceiver instance for network: %s", *lt.Network.Name) lt.cancelF() } } diff --git a/cmd/statsdaemon/main_test.go b/cmd/statsdaemon/main_test.go index 72190280..6623ab94 100644 --- a/cmd/statsdaemon/main_test.go +++ b/cmd/statsdaemon/main_test.go @@ -1,17 +1,135 @@ package main import ( + "encoding/json" + "fmt" "testing" + uuid "github.com/kthomas/go.uuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "github.com/provideplatform/ident/common" + "github.com/provideplatform/nchain/network" + ident "github.com/provideplatform/provide-go/api/ident" + "github.com/provideplatform/provide-go/api/nchain" ) -func TestNChainStatsdaemon(t *testing.T) { +type chainSpecConfig struct{} + +type chainSpec struct { + Config *chainSpecConfig `json:"config"` +} + +type chainConfig struct { + NativeCurrency *string `json:"native_currency"` + IsBaseledgerNetwork bool `json:"is_baseledger_network"` + Client *string `json:"client"` + BlockExplorerUrl *string `json:"block_explorer_url"` + JsonRpcUrl *string `json:"json_rpc_url"` + WebsocketUrl *string `json:"websocket_url"` + Platform *string `json:"platform"` + EngineID *string `json:"engine_id"` + Chain *string `json:"chain"` + ProtocolID *string `json:"protocol_id"` + NetworkID int `json:"network_id"` + ChainSpec *chainSpec `json:"chainspec"` +} + +type chainDef struct { + Name *string `json:"name"` + Enabled bool `json:"enabled"` + Config *chainConfig `json:"config"` +} + +func SetupBaseledgerTestNetwork() (*network.Network, error) { + testID, _ := uuid.NewV4() + + email := "prvd" + testID.String() + "@email.com" + pwd := "super_secret" + _, err := ident.CreateUser("", map[string]interface{}{ + "first_name": "statsdaemon first name" + testID.String(), + "last_name": "statsdaemon last name" + testID.String(), + "email": email, + "password": pwd, + }) + if err != nil { + return nil, fmt.Errorf("error creating user. Error: %s", err.Error()) + } + + authResponse, _ := ident.Authenticate(email, pwd) + if err != nil { + return nil, fmt.Errorf("error authenticating user. Error: %s", err.Error()) + } + + chainySpecConfig := chainSpecConfig{} + chainySpec := chainSpec{ + Config: &chainySpecConfig, + } + chainyConfig := chainConfig{ + NativeCurrency: common.StringOrNil("token"), + Platform: common.StringOrNil("tendermint"), + Client: common.StringOrNil("baseledger"), + NetworkID: 3, + IsBaseledgerNetwork: true, + Chain: common.StringOrNil("peachtree"), + WebsocketUrl: common.StringOrNil("ws://genesis.peachtree.baseledger.provide.network:1337/websocket"), + ChainSpec: &chainySpec, + } + + chainName := fmt.Sprintf("Baseledger Testnet %s", testID.String()) + + chainyChain := chainDef{ + Name: common.StringOrNil(chainName), + Config: &chainyConfig, + } + + chainyChainJSON, _ := json.Marshal(chainyChain) + + params := map[string]interface{}{} + json.Unmarshal(chainyChainJSON, ¶ms) + + testNetwork, err := nchain.CreateNetwork(*authResponse.Token.AccessToken, params) + if err != nil { + return nil, fmt.Errorf("error creating network. Error: %s", err.Error()) + } + + return &network.Network{ + ApplicationID: testNetwork.ApplicationID, + UserID: testNetwork.UserID, + Name: testNetwork.Name, + Description: testNetwork.Description, + Enabled: testNetwork.Enabled, + ChainID: testNetwork.ChainID, + NetworkID: testNetwork.NetworkID, + Config: testNetwork.Config, + }, nil +} + +func TestNChainBaseledgerStatsdaemon(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "NChain statsdaemon Suite") } var _ = Describe("Main", func() { + It("Should parse one successful baseledger block header event", func() { + testNetwork, err := SetupBaseledgerTestNetwork() + if err != nil { + Fail("Failed to set up baseledger test network") + } + + statsDaemon := RequireNetworkStatsDaemon(testNetwork) + // get one result and shutdown statsdaemon and check result + sampleResult := <-statsDaemon.queue + EvictNetworkStatsDaemon(testNetwork) + + jsonSampleResult, _ := json.Marshal(sampleResult.Meta["last_block_header"]) + formattedSampleHeaderResult := nchain.BaseledgerBlockHeaderResponse{}.Value.Header + err = json.Unmarshal(jsonSampleResult, &formattedSampleHeaderResult) + if err != nil { + Fail("Failed to unmarshall header response") + } + Expect(formattedSampleHeaderResult).NotTo(BeNil()) + }) }) diff --git a/cmd/statsdaemon/stats_daemon.go b/cmd/statsdaemon/stats_daemon.go index e15b39f3..a5ec082f 100644 --- a/cmd/statsdaemon/stats_daemon.go +++ b/cmd/statsdaemon/stats_daemon.go @@ -26,6 +26,7 @@ import ( uuid "github.com/kthomas/go.uuid" "github.com/provideplatform/nchain/common" "github.com/provideplatform/nchain/network" + "github.com/provideplatform/provide-go/api/nchain" provide "github.com/provideplatform/provide-go/api/nchain" providecrypto "github.com/provideplatform/provide-go/crypto" ) @@ -125,7 +126,7 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD useBCInfoWebsocket := false if websocketURL == blockchainInfoWebsocketURL { - common.Log.Debugf("Enabling blockchain.info websocket on network stats websocket for configured network: %s", *network.Name) + common.Log.Debugf("enabling blockchain.info websocket on network stats websocket for configured network: %s", *network.Name) useBCInfoWebsocket = true } @@ -138,14 +139,14 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD var wsDialer websocket.Dialer wsConn, _, err := wsDialer.Dial(websocketURL, nil) if err != nil { - common.Log.Errorf("Failed to establish network stats websocket connection to %s; %s", websocketURL, err.Error()) + common.Log.Errorf("failed to establish network stats websocket connection to %s; %s", websocketURL, err.Error()) } else { defer wsConn.Close() payload := map[string]interface{}{ "op": "ping_block", } if err := wsConn.WriteJSON(payload); err != nil { - common.Log.Errorf("Failed to write ping_block message to blockchain.info network stats websocket connection") + common.Log.Errorf("failed to write ping_block message to blockchain.info network stats websocket connection") return err } @@ -153,25 +154,25 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD "op": "blocks_sub", } if err := wsConn.WriteJSON(payload); err != nil { - common.Log.Errorf("Failed to write block subcription message to blockchain.info network stats websocket connection") + common.Log.Errorf("failed to write block subcription message to blockchain.info network stats websocket connection") } else { - common.Log.Debugf("Subscribed to block headers from blockchain.info network stats websocket: %s", websocketURL) + common.Log.Debugf("subscribed to block headers from blockchain.info network stats websocket: %s", websocketURL) for { _, message, err := wsConn.ReadMessage() if err != nil { - common.Log.Errorf("Failed to receive message on network stats websocket; %s", err) + common.Log.Errorf("failed to receive message on network stats websocket; %s", err) break } else { - common.Log.Debugf("Received %d-byte message on network stats websocket for network: %s", len(message), *network.Name) + common.Log.Tracef("received %d-byte message on network stats websocket for network: %s", len(message), *network.Name) response := map[string]interface{}{} err := json.Unmarshal(message, &response) if err != nil { - common.Log.Warningf("Failed to unmarshal message received on network stats websocket: %s; %s", message, err.Error()) + common.Log.Warningf("failed to unmarshal message received on network stats websocket: %s; %s", message, err.Error()) } else { if op, opok := response["op"].(string); opok && op == "block" { if header, headerOk := response["x"].(map[string]interface{}); headerOk { - common.Log.Debugf("Received block header on blockchain.info network stats websocket subscription: %s", websocketURL) + common.Log.Tracef("received block header on blockchain.info network stats websocket subscription: %s", websocketURL) ch <- &provide.NetworkStatus{ Meta: map[string]interface{}{ blockchainInfoWebsocketURL: true, @@ -208,13 +209,13 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD client, err = rpcclient.New(cfg, &rpcclient.NotificationHandlers{ OnClientConnected: func() { - common.Log.Debugf("Bitcoin websocket client connected on configured network stats websocket for network: %s", *network.Name) + common.Log.Debugf("bitcoin websocket client connected on configured network stats websocket for network: %s", *network.Name) // Register for block connect and disconnect notifications. if err := client.NotifyBlocks(); err != nil { } else { if err != nil { - common.Log.Errorf("Failed to establish network stats websocket subscription to %s for network: %s; %s", websocketURL, *network.Name, err.Error()) + common.Log.Errorf("failed to establish network stats websocket subscription to %s for network: %s; %s", websocketURL, *network.Name, err.Error()) client.Disconnect() return } @@ -222,8 +223,8 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD }, OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txns []*btcutil.Tx) { - common.Log.Debugf("Received block header on network stats websocket for network: %s; height: %d", *network.Name, height) - common.Log.Debugf("Block connected: %v (%d) %v", header.BlockHash(), height, header.Timestamp) + common.Log.Debugf("received block header on network stats websocket for network: %s; height: %d", *network.Name, height) + common.Log.Debugf("block connected: %v (%d) %v", header.BlockHash(), height, header.Timestamp) ch <- &provide.NetworkStatus{ Meta: map[string]interface{}{ @@ -233,21 +234,21 @@ func BcoinNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsD }, OnFilteredBlockDisconnected: func(height int32, header *wire.BlockHeader) { - common.Log.Debugf("Received block disconnected header on network stats websocket for network: %s; height: %d", *network.Name, height) - common.Log.Debugf("Block disconnected: %v (%d) %v", header.BlockHash(), height, header.Timestamp) + common.Log.Debugf("received block disconnected header on network stats websocket for network: %s; height: %d", *network.Name, height) + common.Log.Debugf("block disconnected: %v (%d) %v", header.BlockHash(), height, header.Timestamp) }, OnUnknownNotification: func(method string, params []json.RawMessage) { - common.Log.Warningf("Unknown notification received on bitcoin network stats websocket; method: %s; %s", method, params) + common.Log.Warningf("unknown notification received on bitcoin network stats websocket; method: %s; %s", method, params) }, }) if err != nil { - common.Log.Errorf("Failed to establish network stats websocket connection to %s for network: %s; %s", websocketURL, *network.Name, err.Error()) + common.Log.Errorf("failed to establish network stats websocket connection to %s for network: %s; %s", websocketURL, *network.Name, err.Error()) return err } - common.Log.Debugf("Subscribed to network stats websocket: %s", websocketURL) + common.Log.Debugf("subscribed to network stats websocket: %s", websocketURL) client.WaitForShutdown() } @@ -275,7 +276,7 @@ func EthereumNetworkStatsDataSourceFactory(network *network.Network) *NetworkSta var wsDialer websocket.Dialer wsConn, _, err := wsDialer.Dial(websocketURL, nil) if err != nil { - common.Log.Errorf("Failed to establish network stats websocket connection to %s; %s", websocketURL, err.Error()) + common.Log.Errorf("failed to establish network stats websocket connection to %s; %s", websocketURL, err.Error()) } else { defer wsConn.Close() id, _ := uuid.NewV4() @@ -286,21 +287,21 @@ func EthereumNetworkStatsDataSourceFactory(network *network.Network) *NetworkSta "jsonrpc": "2.0", } if err := wsConn.WriteJSON(payload); err != nil { - common.Log.Errorf("Failed to write subscribe message to network stats websocket connection") + common.Log.Errorf("failed to write subscribe message to network stats websocket connection") } else { - common.Log.Debugf("Subscribed to network stats websocket: %s", websocketURL) + common.Log.Debugf("subscribed to network stats websocket: %s", websocketURL) for { _, message, err := wsConn.ReadMessage() if err != nil { - common.Log.Errorf("Failed to receive message on network stats websocket; %s", err) + common.Log.Errorf("failed to receive message on network stats websocket; %s", err) break } else { - common.Log.Debugf("Received %d-byte message on network stats websocket for network: %s", len(message), *network.Name) + common.Log.Tracef("received %d-byte message on network stats websocket for network: %s", len(message), *network.Name) response := &provide.EthereumWebsocketSubscriptionResponse{} err := json.Unmarshal(message, response) if err != nil { - common.Log.Warningf("Failed to unmarshal message received on network stats websocket: %s; %s", message, err.Error()) + common.Log.Warningf("failed to unmarshal message received on network stats websocket: %s; %s", message, err.Error()) } else { if result, ok := response.Params["result"].(map[string]interface{}); ok { if _, mixHashOk := result["mixHash"]; !mixHashOk { @@ -313,7 +314,7 @@ func EthereumNetworkStatsDataSourceFactory(network *network.Network) *NetworkSta header := &types.Header{} err := json.Unmarshal(resultJSON, header) if err != nil { - common.Log.Warningf("Failed to stringify result JSON in otherwise valid message received on network stats websocket: %s; %s", response, err.Error()) + common.Log.Warningf("failed to stringify result JSON in otherwise valid message received on network stats websocket: %s; %s", response, err.Error()) } else if header != nil && header.Number != nil { ch <- &provide.NetworkStatus{ Meta: map[string]interface{}{ @@ -333,34 +334,105 @@ func EthereumNetworkStatsDataSourceFactory(network *network.Network) *NetworkSta } } +// BaseledgerNetworkStatsDataSourceFactory builds and returns a JSON-RPC and streaming websocket +// data source which is used by stats daemon instances to consume EVM-based network statistics +func BaseledgerNetworkStatsDataSourceFactory(network *network.Network) *NetworkStatsDataSource { + return &NetworkStatsDataSource{ + Network: network, + + Poll: func(ch chan *provide.NetworkStatus) error { + return new(jsonRpcNotSupported) + }, + + Stream: func(ch chan *provide.NetworkStatus) error { + websocketURL := network.WebsocketURL() + if websocketURL == "" { + err := new(websocketNotSupported) + return *err + } + var wsDialer websocket.Dialer + wsConn, _, err := wsDialer.Dial(websocketURL, nil) + if err != nil { + common.Log.Errorf("Failed to establish network stats websocket connection to %s; %s", websocketURL, err.Error()) + } else { + defer wsConn.Close() + // { "jsonrpc": "2.0", "method": "subscribe", "params": ["tm.event='NewBlock'"], "id": 1 } + payload := map[string]interface{}{ + "method": "subscribe", + "params": []string{"tm.event='NewBlockHeader'"}, + "id": 1, + "jsonrpc": "2.0", + } + if err := wsConn.WriteJSON(payload); err != nil { + common.Log.Errorf("Failed to write subscribe message to network stats websocket connection") + } else { + common.Log.Debugf("Subscribed to network stats websocket: %s", websocketURL) + for { + _, message, err := wsConn.ReadMessage() + if err != nil { + common.Log.Errorf("Failed to receive message on network stats websocket; %s", err) + break + } else { + common.Log.Debugf("Received %d-byte message on network stats websocket for network: %s", len(message), *network.Name) + response := &provide.BaseledgerSubscriptionResponse{} + err := json.Unmarshal(message, response) + if err != nil { + common.Log.Warningf("Failed to unmarshal message received on network stats websocket: %s; %s", message, err.Error()) + } else { + if result, ok := response.Result["data"].(map[string]interface{}); ok { + if resultJSON, err := json.Marshal(result); err == nil { + headerResponse := &nchain.BaseledgerBlockHeaderResponse{} + err := json.Unmarshal(resultJSON, headerResponse) + if err != nil { + common.Log.Warningf("Failed to stringify result JSON in otherwise valid message received on network stats websocket: %s; %s", response, err.Error()) + } else if headerResponse != nil { + common.Log.Debugf("Block header received %v\n", headerResponse.Value.Header) + ch <- &provide.NetworkStatus{ + Meta: map[string]interface{}{ + "last_block_header": headerResponse.Value.Header, + }, + } + } + } + } + } + } + } + } + } + return err + }, + } +} + // Consume the websocket stream; attempts to fallback to JSON-RPC if websocket stream fails or is not available for the network func (sd *StatsDaemon) consume() []error { errs := make([]error, 0) - sd.log.Debugf("Attempting to consume configured stats daemon data source; attempt #%v", sd.attempt) + sd.log.Debugf("attempting to consume configured stats daemon data source; attempt #%v", sd.attempt) var err error if sd.dataSource != nil { err = sd.dataSource.Stream(sd.queue) } else { - err = errors.New("Configured stats daemon does not have a configured data source") + err = errors.New("configured stats daemon does not have a configured data source") } if err != nil { errs = append(errs, err) switch err.(type) { case jsonRpcNotSupported: - sd.log.Warningf("Configured stats daemon data source does not support JSON-RPC; attempting to upgrade to websocket stream for network id: %s", sd.dataSource.Network.ID) + sd.log.Warningf("configured stats daemon data source does not support JSON-RPC; attempting to upgrade to websocket stream for network id: %s", sd.dataSource.Network.ID) err := sd.dataSource.Stream(sd.queue) if err != nil { errs = append(errs, err) - sd.log.Warningf("Configured stats daemon data source returned error while consuming JSON-RPC endpoint: %s; restarting stream...", err.Error()) + sd.log.Warningf("configured stats daemon data source returned error while consuming JSON-RPC endpoint: %s; restarting stream...", err.Error()) } case websocketNotSupported: - sd.log.Warningf("Configured stats daemon data source does not support streaming via websocket; attempting to fallback to JSON-RPC long polling using stats daemon for network id: %s", sd.dataSource.Network.ID) + sd.log.Warningf("configured stats daemon data source does not support streaming via websocket; attempting to fallback to JSON-RPC long polling using stats daemon for network id: %s", sd.dataSource.Network.ID) err := sd.dataSource.Poll(sd.queue) if err != nil { errs = append(errs, err) - sd.log.Warningf("Configured stats daemon data source returned error while consuming JSON-RPC endpoint: %s; restarting stream...", err.Error()) + sd.log.Warningf("configured stats daemon data source returned error while consuming JSON-RPC endpoint: %s; restarting stream...", err.Error()) } } } @@ -370,7 +442,7 @@ func (sd *StatsDaemon) consume() []error { func (sd *StatsDaemon) ingest(response interface{}) { defer func() { if r := recover(); r != nil { - common.Log.Warningf("Recovered from failed stats daemon message ingestion attempt; %s", r) + common.Log.Warningf("recovered from failed stats daemon message ingestion attempt; %s", r) } }() @@ -458,7 +530,7 @@ func (sd *StatsDaemon) ingestBcoin(response interface{}) { sd.stats.Syncing = sd.stats.Block == 0 if sd.stats.Block == 0 { - common.Log.Debugf("Ignoring genesis header") + common.Log.Debugf("ignoring genesis header") return } } @@ -473,10 +545,6 @@ func (sd *StatsDaemon) ingestBcoin(response interface{}) { merkleRoot, _ := header["merkleroot"].(string) - // chainptID := fmt.Sprintf("provide.%s.block", sd.dataSource.Network.ID) - // chainptHash := []byte(merkleRoot) - // providechainpoint.ImmortalizeHashes(chainptID, []*[]byte{&chainptHash}) - if len(sd.recentBlocks) == 0 || sd.recentBlocks[len(sd.recentBlocks)-1].(map[string]interface{})["merkleroot"].(string) != merkleRoot { sd.recentBlocks = append(sd.recentBlocks, header) sd.recentBlockTimestamps = append(sd.recentBlockTimestamps, lastBlockAt) @@ -508,27 +576,21 @@ func (sd *StatsDaemon) ingestBcoin(response interface{}) { sd.stats.Meta["last_block_hash"] = merkleRoot } } else if medianTime, medianTimeOk := chainInfo["mediantime"].(float64); medianTimeOk { - // This is pretty naive but gives us an avg. time before we have >= 3 recent blocks; can take some time after statsdaemon starts monitoring a PoW network... + // This is pretty naive but gives us an avg. time before we have >= 3 recent blocks; + // can take some time after statsdaemon starts monitoring a PoW network... sd.stats.Meta["average_blocktime"] = (float64(time.Now().Unix()) - medianTime) / (11.0 / 2.0) } } else { - common.Log.Warningf("Failed to parse last_block_header from *provide.NetworkStats meta; dropping message...") + common.Log.Warningf("failed to parse last_block_header from *provide.NetworkStats meta; dropping message...") } } else { - common.Log.Warningf("Received malformed *provide.NetworkStats message; dropping message...") + common.Log.Warningf("received malformed *provide.NetworkStats message; dropping message...") } } sd.publish() } -func (sd *StatsDaemon) ingestLcoin(response interface{}) { - switch response.(type) { - default: - common.Log.Warningf("Lcoin ingest functionality not yet implemented in stats daemon") - } -} - func (sd *StatsDaemon) ingestEthereum(response interface{}) { switch response.(type) { case *provide.NetworkStatus: @@ -546,16 +608,16 @@ func (sd *StatsDaemon) ingestEthereum(response interface{}) { hdr := &types.Header{} err := json.Unmarshal(headerJSON, hdr) if err != nil { - common.Log.Warningf("Failed to stringify result JSON in otherwise valid message received via JSON-RPC: %s; %s", response, err.Error()) + common.Log.Warningf("failed to stringify result JSON in otherwise valid message received via JSON-RPC: %s; %s", response, err.Error()) } else if hdr != nil && hdr.Number != nil { sd.ingest(hdr) } } } else { - common.Log.Warningf("Failed to parse last_block_header from *provide.NetworkStats meta; dropping message...") + common.Log.Warningf("failed to parse last_block_header from *provide.NetworkStats meta; dropping message...") } } else { - common.Log.Warningf("Received malformed *provide.NetworkStats message; dropping message...") + common.Log.Warningf("received malformed *provide.NetworkStats message; dropping message...") } case *types.Header: header := response.(*types.Header) @@ -564,7 +626,7 @@ func (sd *StatsDaemon) ingestEthereum(response interface{}) { sd.stats.Syncing = sd.stats.Block == 0 if sd.stats.Block == 0 { - common.Log.Debugf("Ignoring genesis header") + common.Log.Debugf("ignoring genesis header") return } @@ -575,10 +637,6 @@ func (sd *StatsDaemon) ingestEthereum(response interface{}) { blockHash := header.Hash().String() - // chainptID := fmt.Sprintf("provide.%s.block", sd.dataSource.Network.ID) - // chainptHash := []byte(blockHash) - // providechainpoint.ImmortalizeHashes(chainptID, []*[]byte{&chainptHash}) - if len(sd.recentBlocks) == 0 || sd.recentBlocks[len(sd.recentBlocks)-1].(*types.Header).Hash().String() != blockHash { sd.recentBlocks = append(sd.recentBlocks, header) sd.recentBlockTimestamps = append(sd.recentBlockTimestamps, lastBlockAt) @@ -633,7 +691,7 @@ func (sd *StatsDaemon) loop() error { sd.ingest(msg) case <-sd.shutdownCtx.Done(): - sd.log.Debugf("Closing stats daemon on shutdown") + sd.log.Debugf("closing stats daemon on shutdown") return nil } } @@ -654,15 +712,17 @@ func (sd *StatsDaemon) publish() error { // EvictNetworkStatsDaemon evicts a single, previously-initialized stats daemon instance { func EvictNetworkStatsDaemon(network *network.Network) error { + currentNetworkStatsMutex.Lock() + defer currentNetworkStatsMutex.Unlock() + if daemon, ok := currentNetworkStats[network.ID.String()]; ok { - common.Log.Debugf("Evicting stats daemon instance for network: %s; id: %s", *network.Name, network.ID) + common.Log.Debugf("evicting stats daemon instance for network: %s; id: %s", *network.Name, network.ID) daemon.shutdown() - currentNetworkStatsMutex.Lock() delete(currentNetworkStats, network.ID.String()) - currentNetworkStatsMutex.Unlock() return nil } - return fmt.Errorf("Unable to evict stats daemon instance for network: %s; id; %s", *network.Name, network.ID) + + return fmt.Errorf("unable to evict stats daemon instance for network: %s; id; %s", *network.Name, network.ID) } // RequireNetworkStatsDaemon ensures a single stats daemon instance is running for @@ -671,18 +731,16 @@ func EvictNetworkStatsDaemon(network *network.Network) error { func RequireNetworkStatsDaemon(network *network.Network) *StatsDaemon { var daemon *StatsDaemon if daemon, ok := currentNetworkStats[network.ID.String()]; ok { - common.Log.Debugf("Cached stats daemon instance found for network: %s; id: %s", *network.Name, network.ID) + common.Log.Debugf("cached stats daemon instance found for network: %s; id: %s", *network.Name, network.ID) return daemon } - currentNetworkStatsMutex.Lock() - common.Log.Infof("Initializing new stats daemon instance for network: %s; id: %s", *network.Name, network.ID) + common.Log.Infof("initializing new stats daemon instance for network: %s; id: %s", *network.Name, network.ID) daemon = NewNetworkStatsDaemon(common.Log, network) if daemon != nil { currentNetworkStats[network.ID.String()] = daemon go daemon.run() } - currentNetworkStatsMutex.Unlock() return daemon } @@ -690,6 +748,9 @@ func RequireNetworkStatsDaemon(network *network.Network) *StatsDaemon { // NewNetworkStatsDaemon initializes a new network stats daemon instance using // NetworkStatsDataSourceFactory to construct the daemon's its data source func NewNetworkStatsDaemon(lg *logger.Logger, network *network.Network) *StatsDaemon { + currentNetworkStatsMutex.Lock() + defer currentNetworkStatsMutex.Unlock() + sd := new(StatsDaemon) sd.attempt = 0 sd.log = lg.Clone() @@ -700,8 +761,10 @@ func NewNetworkStatsDaemon(lg *logger.Logger, network *network.Network) *StatsDa sd.dataSource = BcoinNetworkStatsDataSourceFactory(network) } else if network.IsEthereumNetwork() { sd.dataSource = EthereumNetworkStatsDataSourceFactory(network) + } else if network.IsBaseledgerNetwork() { + sd.dataSource = BaseledgerNetworkStatsDataSourceFactory(network) } - //sd.handleSignals() + // sd.handleSignals() if sd.dataSource == nil { return nil @@ -711,7 +774,7 @@ func NewNetworkStatsDaemon(lg *logger.Logger, network *network.Network) *StatsDa if chainID == nil { chn, err := providecrypto.EVMGetChainID(network.ID.String(), network.RPCURL()) if err != nil { - common.Log.Debugf("Error getting chain id for %s network. Error: %s", network.ID.String(), err.Error()) + common.Log.Warningf("failed to retrieve chain id for %s network. Error: %s", network.ID.String(), err.Error()) return nil } _chainID := hexutil.EncodeBig(chn) @@ -731,10 +794,10 @@ func (sd *StatsDaemon) run() error { go func() { for !sd.shuttingDown() { sd.attempt++ - common.Log.Debugf("Stepping into main runloop of stats daemon instance; attempt #%v", sd.attempt) + common.Log.Debugf("stepping into main runloop of stats daemon instance; attempt #%v", sd.attempt) errs := sd.consume() if len(errs) > 0 { - common.Log.Warningf("Configured stats daemon data source returned %v error(s) while attempting to consume configured data source", len(errs)) + common.Log.Warningf("configured stats daemon data source returned %v error(s) while attempting to consume configured data source", len(errs)) if sd.backoff == 0 { sd.backoff = 100 } else { @@ -752,10 +815,10 @@ func (sd *StatsDaemon) run() error { err := sd.loop() if err == nil { - sd.log.Info("Stats daemon exited cleanly") + sd.log.Info("stats daemon exited cleanly") } else { if !sd.shuttingDown() { - common.Log.Errorf("Forcing shutdown of stats daemon due to error; %s", err) + common.Log.Errorf("forcing shutdown of stats daemon due to error; %s", err) sd.shutdown() } } @@ -764,14 +827,14 @@ func (sd *StatsDaemon) run() error { } func (sd *StatsDaemon) handleSignals() { - common.Log.Debug("Installing SIGINT and SIGTERM signal handlers") + common.Log.Debug("installing SIGINT and SIGTERM signal handlers") sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { select { case sig := <-sigs: - common.Log.Infof("Received signal: %s", sig) + common.Log.Infof("received signal: %s", sig) sd.shutdown() case <-sd.shutdownCtx.Done(): close(sigs) @@ -781,7 +844,7 @@ func (sd *StatsDaemon) handleSignals() { func (sd *StatsDaemon) shutdown() { if atomic.AddUint32(&sd.closing, 1) == 1 { - common.Log.Debugf("Shutting down stats daemon instance for network: %s", *sd.dataSource.Network.Name) + common.Log.Debugf("shutting down stats daemon instance for network: %s", *sd.dataSource.Network.Name) sd.cancelF() } } diff --git a/go.mod b/go.mod index 7501a45e..d6897f41 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/onsi/gomega v1.10.1 github.com/prometheus/tsdb v0.10.0 // indirect github.com/provideplatform/ident v0.9.10-0.20210727215953-cdd9940805ce - github.com/provideplatform/provide-go v0.0.0-20210701150626-bff9948fbd9a + github.com/provideplatform/provide-go v0.0.0-20210822183020-1c080cdc4a77 github.com/spaolacci/murmur3 v1.1.1-0.20190317074736-539464a789e9 // indirect github.com/status-im/keycard-go v0.0.0-20191119114148-6dd40a46baa0 // indirect go.mongodb.org/mongo-driver v1.5.1 diff --git a/go.sum b/go.sum index 1e4cbfbb..fa2eb0fa 100644 --- a/go.sum +++ b/go.sum @@ -576,8 +576,8 @@ github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSg github.com/provideplatform/ident v0.9.10-0.20210727215953-cdd9940805ce h1:0JZPWGnkBdFRGN/ptm49gMebcmMkLqnRukExazrvwCQ= github.com/provideplatform/ident v0.9.10-0.20210727215953-cdd9940805ce/go.mod h1:A0IzIOtqzS+MgpSTTKSQITY4kPH48IsGXw5pDCcegTU= github.com/provideplatform/provide-go v0.0.0-20210624064849-d7328258f0d8/go.mod h1:q0/Q8KaZxYg84rdwBIIE7ZwHluzM5zw7zJJoJOqAbzg= -github.com/provideplatform/provide-go v0.0.0-20210701150626-bff9948fbd9a h1:9NekZCQm1lPSlh76Vu+mSlrr99ofxYMzdzltBCHcu9Q= -github.com/provideplatform/provide-go v0.0.0-20210701150626-bff9948fbd9a/go.mod h1:q0/Q8KaZxYg84rdwBIIE7ZwHluzM5zw7zJJoJOqAbzg= +github.com/provideplatform/provide-go v0.0.0-20210822183020-1c080cdc4a77 h1:+/LcHOVu85ySEFunR/6f8k4GcBBb/xug86pGHkohc6I= +github.com/provideplatform/provide-go v0.0.0-20210822183020-1c080cdc4a77/go.mod h1:q0/Q8KaZxYg84rdwBIIE7ZwHluzM5zw7zJJoJOqAbzg= github.com/provideservices/provide-go v0.0.0-20210409104111-70ad008e4ae8 h1:QaP1R8tsO69Pt5FQ2y8JZNpH5Dwk5xeNRcoethNcx3U= github.com/provideservices/provide-go v0.0.0-20210409104111-70ad008e4ae8/go.mod h1:zMvfECorm5+hb9aewWEVpxkaFGko3r5Qk3/N/aIuLeY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= diff --git a/network/consumer.go b/network/consumer.go index ec2ade4c..08f65d2b 100644 --- a/network/consumer.go +++ b/network/consumer.go @@ -16,8 +16,8 @@ import ( ) const natsBlockFinalizedSubject = "nchain.block.finalized" -const natsBlockFinalizedSubjectMaxInFlight = 2048 -const natsBlockFinalizedInvocationTimeout = time.Second * 30 +const natsBlockFinalizedSubjectMaxInFlight = 4096 +const natsBlockFinalizedInvocationTimeout = time.Second * 15 const natsBlockFinalizedTimeout = int64(time.Minute * 1) const natsResolveNodePeerURLSubject = "nchain.node.peer.resolve" @@ -131,18 +131,18 @@ func consumeBlockFinalizedMsg(msg *stan.Msg) { } }() - common.Log.Debugf("Consuming NATS block finalized message: %s", msg) + common.Log.Debugf("consuming NATS block finalized message: %s", msg) var err error blockFinalizedMsg := &natsBlockFinalizedMsg{} err = json.Unmarshal(msg.Data, &blockFinalizedMsg) if err != nil { - common.Log.Warningf("Failed to unmarshal block finalized message; %s", err.Error()) + common.Log.Warningf("failed to unmarshal block finalized message; %s", err.Error()) return } if blockFinalizedMsg.NetworkID == nil { - err = fmt.Errorf("Parsed NATS block finalized message did not contain network id: %s", msg) + err = fmt.Errorf("parsed NATS block finalized message did not contain network id: %s", msg) } if err == nil { @@ -152,7 +152,7 @@ func consumeBlockFinalizedMsg(msg *stan.Msg) { db.Where("id = ?", blockFinalizedMsg.NetworkID).Find(&network) if network == nil || network.ID == uuid.Nil { - err = fmt.Errorf("Failed to retrieve network by id: %s", *blockFinalizedMsg.NetworkID) + err = fmt.Errorf("failed to retrieve network by id: %s", *blockFinalizedMsg.NetworkID) } if err == nil { @@ -193,16 +193,16 @@ func consumeBlockFinalizedMsg(msg *stan.Msg) { } } } else { - err = fmt.Errorf("Failed to decode EVM block header; %s", err.Error()) + err = fmt.Errorf("failed to decode EVM block header; %s", err.Error()) } } else { - common.Log.Warningf("Received unhandled finalized block header; network id: %s", *blockFinalizedMsg.NetworkID) + common.Log.Warningf("received unhandled finalized block header; network id: %s", *blockFinalizedMsg.NetworkID) } } } if err != nil { - common.Log.Warningf("Failed to handle block finalized message; %s", err.Error()) + common.Log.Warningf("failed to handle block finalized message; %s", err.Error()) natsutil.AttemptNack(msg, natsBlockFinalizedTimeout) } else { msg.Ack() @@ -216,12 +216,12 @@ func consumeResolveNodePeerURLMsg(msg *stan.Msg) { } }() - common.Log.Debugf("Consuming NATS resolve node peer url message: %s", msg) + common.Log.Debugf("consuming NATS resolve node peer url message: %s", msg) var params map[string]interface{} err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to umarshal resolve node peer url message; %s", err.Error()) + common.Log.Warningf("failed to umarshal resolve node peer url message; %s", err.Error()) natsutil.Nack(msg) return } @@ -229,7 +229,7 @@ func consumeResolveNodePeerURLMsg(msg *stan.Msg) { nodeID, nodeIDOk := params["node_id"].(string) if !nodeIDOk { - common.Log.Warningf("Failed to resolve peer url for node; no node id provided") + common.Log.Warningf("failed to resolve peer url for node; no node id provided") natsutil.Nack(msg) return } @@ -239,14 +239,14 @@ func consumeResolveNodePeerURLMsg(msg *stan.Msg) { node := &Node{} db.Where("id = ?", nodeID).Find(&node) if node == nil || node.ID == uuid.Nil { - common.Log.Warningf("Failed to resolve node; no node resolved for id: %s", nodeID) + common.Log.Warningf("failed to resolve node; no node resolved for id: %s", nodeID) natsutil.Nack(msg) return } err = node.resolvePeerURL(db) if err != nil { - common.Log.Debugf("Attempt to resolve node peer url did not succeed; %s", err.Error()) + common.Log.Debugf("attempt to resolve node peer url did not succeed; %s", err.Error()) natsutil.AttemptNack(msg, natsResolveNodePeerURLTimeout) return } @@ -267,12 +267,12 @@ func consumeAddNodePeerMsg(msg *stan.Msg) { } }() - common.Log.Debugf("Consuming NATS add peer message: %s", msg) + common.Log.Debugf("consuming NATS add peer message: %s", msg) var params map[string]interface{} err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to umarshal add peer message; %s", err.Error()) + common.Log.Warningf("failed to umarshal add peer message; %s", err.Error()) natsutil.Nack(msg) return } @@ -281,13 +281,13 @@ func consumeAddNodePeerMsg(msg *stan.Msg) { peerURL, peerURLOk := params["peer_url"].(string) if !nodeIDOk { - common.Log.Warningf("Failed to add network peer; no node id provided") + common.Log.Warningf("failed to add network peer; no node id provided") natsutil.Nack(msg) return } if !peerURLOk { - common.Log.Warningf("Failed to add network peer; no peer url provided") + common.Log.Warningf("failed to add network peer; no peer url provided") natsutil.Nack(msg) return } @@ -297,7 +297,7 @@ func consumeAddNodePeerMsg(msg *stan.Msg) { node := &Node{} db.Where("id = ?", nodeID).Find(&node) if node == nil || node.ID == uuid.Nil { - common.Log.Warningf("Failed to resolve node; no node resolved for id: %s", nodeID) + common.Log.Warningf("failed to resolve node; no node resolved for id: %s", nodeID) natsutil.Nack(msg) return } @@ -319,12 +319,12 @@ func consumeRemoveNodePeerMsg(msg *stan.Msg) { } }() - common.Log.Debugf("Consuming NATS remove peer message: %s", msg) + common.Log.Debugf("consuming NATS remove peer message: %s", msg) var params map[string]interface{} err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to umarshal remove peer message; %s", err.Error()) + common.Log.Warningf("failed to umarshal remove peer message; %s", err.Error()) natsutil.Nack(msg) return } @@ -333,13 +333,13 @@ func consumeRemoveNodePeerMsg(msg *stan.Msg) { peerURL, peerURLOk := params["peer_url"].(string) if !nodeIDOk { - common.Log.Warningf("Failed to remove network peer; no node id provided") + common.Log.Warningf("failed to remove network peer; no node id provided") natsutil.Nack(msg) return } if !peerURLOk { - common.Log.Warningf("Failed to remove network peer; no peer url provided") + common.Log.Warningf("failed to remove network peer; no peer url provided") natsutil.Nack(msg) return } @@ -349,14 +349,14 @@ func consumeRemoveNodePeerMsg(msg *stan.Msg) { node := &Node{} db.Where("id = ?", nodeID).Find(&node) if node == nil || node.ID == uuid.Nil { - common.Log.Warningf("Failed to resolve node; no node resolved for id: %s", nodeID) + common.Log.Warningf("failed to resolve node; no node resolved for id: %s", nodeID) natsutil.Nack(msg) return } err = node.removePeer(peerURL) if err != nil { - common.Log.Debugf("Attempt to remove network peer failed; %s", err.Error()) + common.Log.Debugf("attempt to remove network peer failed; %s", err.Error()) natsutil.AttemptNack(msg, natsRemoveNodePeerTimeout) return } diff --git a/network/network.go b/network/network.go index 976d1876..923c6b99 100644 --- a/network/network.go +++ b/network/network.go @@ -55,6 +55,7 @@ const networkConfigIsHandshakeNetwork = "is_handshake_network" const networkConfigIsHyperledgerBesuNetwork = "is_hyperledger_besu_network" const networkConfigIsHyperledgerFabricNetwork = "is_hyperledger_fabric_network" const networkConfigIsQuorumNetwork = "is_quorum_network" +const networkConfigIsBaseledgerNetwork = "is_baseledger_network" const networkConfigEnvBootnodes = "BOOTNODES" const networkConfigEnvClient = "CLIENT" @@ -868,6 +869,16 @@ func (n *Network) IsHandshakeNetwork() bool { return false } +func (n *Network) IsBaseledgerNetwork() bool { + cfg := n.ParseConfig() + if cfg != nil { + if isBaseledgerNetwork, ok := cfg[networkConfigIsBaseledgerNetwork].(bool); ok { + return isBaseledgerNetwork + } + } + return false +} + // P2PAPIClient returns an instance of the network's underlying p2p.API, if that is possible given the network config func (n *Network) P2PAPIClient() (p2p.API, error) { cfg := n.ParseConfig() @@ -897,6 +908,8 @@ func (n *Network) P2PAPIClient() (p2p.API, error) { apiClient = p2p.InitParityP2PProvider(common.StringOrNil(rpcURL), n.ID.String(), n) case p2p.ProviderQuorum: apiClient = p2p.InitQuorumP2PProvider(common.StringOrNil(rpcURL), n.ID.String(), n) + case p2p.ProviderBaseledger: + apiClient = p2p.InitBaseledgerP2PProvider(common.StringOrNil(rpcURL), n.ID.String(), n) default: return nil, fmt.Errorf("Failed to resolve p2p provider for network %s; unsupported client", n.ID) } diff --git a/network/node.go b/network/node.go index d872c72a..93c722ec 100644 --- a/network/node.go +++ b/network/node.go @@ -735,6 +735,8 @@ func (n *Node) P2PAPIClient() (p2p.API, error) { var apiClient p2p.API switch client { + case p2p.ProviderBaseledger: + apiClient = p2p.InitBaseledgerP2PProvider(rpcURL, n.NetworkID.String(), n.Network) case p2p.ProviderBcoin: return nil, fmt.Errorf("Bcoin p2p provider not yet implemented") case p2p.ProviderGeth: diff --git a/network/p2p/baseledger.go b/network/p2p/baseledger.go new file mode 100644 index 00000000..0153cc87 --- /dev/null +++ b/network/p2p/baseledger.go @@ -0,0 +1,158 @@ +package p2p + +import ( + "encoding/json" + "errors" + "fmt" + "math/big" + "strconv" + "strings" + + "github.com/jinzhu/gorm" + uuid "github.com/kthomas/go.uuid" + "github.com/provideplatform/nchain/common" + provide "github.com/provideplatform/provide-go/api" + nchain "github.com/provideplatform/provide-go/api/nchain" +) + +// BaseledgerP2PProvider is a network.p2p.API implementing the baseledger API +type BaseledgerP2PProvider struct { + rpcClientKey *string + rpcURL *string + network common.Configurable + networkID string +} + +// BaseledgerP2PProvider initializes and returns the baseledger p2p provider +func InitBaseledgerP2PProvider(rpcURL *string, networkID string, ntwrk common.Configurable) *BaseledgerP2PProvider { + return &BaseledgerP2PProvider{ + rpcClientKey: rpcURL, + rpcURL: rpcURL, + network: ntwrk, + networkID: networkID, + } +} + +// DefaultEntrypoint returns the default entrypoint to run when starting the container, when one is not otherwise provided +func (p *BaseledgerP2PProvider) DefaultEntrypoint() []string { + return []string{} +} + +// EnrichStartCommand returns the cmd to append to the command to start the container +func (p *BaseledgerP2PProvider) EnrichStartCommand(bootnodes []string) []string { + cmd := make([]string, 0) + + return cmd +} + +// AcceptNonReservedPeers allows non-reserved peers to connect +func (p *BaseledgerP2PProvider) AcceptNonReservedPeers() error { + return errors.New("not yet implemented") +} + +// DropNonReservedPeers only allows reserved peers to connect; reversed by calling `AcceptNonReservedPeers` +func (p *BaseledgerP2PProvider) DropNonReservedPeers() error { + return errors.New("not yet implemented") +} + +// AddPeer adds a peer by its peer url +func (p *BaseledgerP2PProvider) AddPeer(peerURL string) error { + return errors.New("not yet implemented") +} + +// FetchTxReceipt fetch a transaction receipt given its hash +func (p *BaseledgerP2PProvider) FetchTxReceipt(signerAddress, hash string) (*nchain.TxReceipt, error) { + httpClient := &provide.Client{ + Host: *p.rpcURL, + Scheme: "http", + } + + status, resp, err := httpClient.Get("tx", map[string]interface{}{"hash": hash}) + + if err != nil { + return nil, err + } + + if status != 200 { + respJSON, _ := json.Marshal(resp) + return nil, errors.New(string(respJSON)) + } + + txEntity := &nchain.TendermintTx{} + respJSON, _ := json.Marshal(resp) + json.Unmarshal(respJSON, &txEntity) + + status, resp, err = httpClient.Get("block", map[string]interface{}{"height": txEntity.Result.Height}) + + if err != nil { + return nil, err + } + + if status != 200 { + respJSON, _ := json.Marshal(resp) + return nil, errors.New(string(respJSON)) + } + + blockEntity := &nchain.TendermintBlock{} + respJSON, _ = json.Marshal(resp) + json.Unmarshal(respJSON, &blockEntity) + + gasUsed, _ := strconv.Atoi(txEntity.Result.TxResult.GasUsed) + n := new(big.Int) + blockNumber, _ := n.SetString(txEntity.Result.Height, 10) + var logs []interface{} + json.Unmarshal([]byte(txEntity.Result.TxResult.Log), &logs) + return &nchain.TxReceipt{ + TxHash: []byte(fmt.Sprintf("0x%s", string(txEntity.Result.Hash))), + ContractAddress: nil, + GasUsed: uint64(gasUsed), + BlockHash: []byte(fmt.Sprintf("0x%s", string(blockEntity.Result.BlockID.Hash))), + BlockNumber: blockNumber, + TransactionIndex: 0, + PostState: nil, + Status: uint64(txEntity.Result.TxResult.Code), + CumulativeGasUsed: uint64(gasUsed), + Bloom: nil, + Logs: logs, + }, nil +} + +// FetchTxTraces fetch transaction traces given its hash +func (p *BaseledgerP2PProvider) FetchTxTraces(hash string) (*nchain.TxTrace, error) { + return nil, errors.New("not yet implemented") +} + +// FormatBootnodes formats the given peer urls as a valid bootnodes param +func (p *BaseledgerP2PProvider) FormatBootnodes(bootnodes []string) string { + return strings.Join(bootnodes, ",") +} + +// ParsePeerURL parses a peer url from the given raw log string +func (p *BaseledgerP2PProvider) ParsePeerURL(string) (*string, error) { + return nil, errors.New("not yet implemented") +} + +// RemovePeer removes a peer by its peer url +func (p *BaseledgerP2PProvider) RemovePeer(peerURL string) error { + return errors.New("not yet implemented") +} + +// ResolvePeerURL attempts to resolve one or more viable peer urls +func (p *BaseledgerP2PProvider) ResolvePeerURL() (*string, error) { + return nil, errors.New("not yet implemented") +} + +// ResolveTokenContract attempts to resolve the given token contract details for the contract at a given address +func (p *BaseledgerP2PProvider) ResolveTokenContract(signerAddress string, receipt interface{}, artifact *nchain.CompiledArtifact) (*string, *string, *big.Int, *string, error) { + return nil, nil, nil, nil, errors.New("not yet implemented") +} + +// RequireBootnodes attempts to resolve the peers to use as bootnodes +func (p *BaseledgerP2PProvider) RequireBootnodes(db *gorm.DB, userID *uuid.UUID, networkID *uuid.UUID, n common.Configurable) error { + return errors.New("not yet implemented") +} + +// Upgrade executes a pending upgrade +func (p *BaseledgerP2PProvider) Upgrade() error { + return errors.New("not yet implemented") +} diff --git a/network/p2p/common.go b/network/p2p/common.go index d8f085e3..01bfbf1d 100644 --- a/network/p2p/common.go +++ b/network/p2p/common.go @@ -58,6 +58,9 @@ const ProviderParity = "parity" // ProviderQuorum quorum p2p provider const ProviderQuorum = "quorum" +// ProviderBaseledger baseledger p2p provider +const ProviderBaseledger = "baseledger" + const tokenTypeERC20 = "ERC-20" const tokenTypeERC721 = "ERC-721" diff --git a/network/p2p/fabric.go b/network/p2p/fabric.go index 6735b293..0fa633d7 100644 --- a/network/p2p/fabric.go +++ b/network/p2p/fabric.go @@ -49,7 +49,7 @@ func (p *HyperledgerFabricP2PProvider) EnrichStartCommand(bootnodes []string) [] } // FetchTxReceipt fetch a transaction receipt given its hash -func (p *HyperledgerFabricP2PProvider) FetchTxReceipt(hash, signerAddress string) (*provide.TxReceipt, error) { +func (p *HyperledgerFabricP2PProvider) FetchTxReceipt(signerAddress, hash string) (*provide.TxReceipt, error) { return nil, errors.New("fabric does not impl FetchTxReceipt()") } diff --git a/network/p2p/geth.go b/network/p2p/geth.go index 92fd59b0..f8ea9781 100644 --- a/network/p2p/geth.go +++ b/network/p2p/geth.go @@ -107,10 +107,10 @@ func (p *GethP2PProvider) FetchTxReceipt(signerAddress, hash string) (*provide.T } return &provide.TxReceipt{ - TxHash: receipt.TxHash, - ContractAddress: receipt.ContractAddress, + TxHash: receipt.TxHash.Bytes(), + ContractAddress: receipt.ContractAddress.Bytes(), GasUsed: receipt.GasUsed, - BlockHash: receipt.BlockHash, + BlockHash: receipt.BlockHash.Bytes(), BlockNumber: receipt.BlockNumber, TransactionIndex: receipt.TransactionIndex, PostState: receipt.PostState, diff --git a/network/p2p/nethermind.go b/network/p2p/nethermind.go index a638deff..ab880cc1 100644 --- a/network/p2p/nethermind.go +++ b/network/p2p/nethermind.go @@ -55,10 +55,10 @@ func (p *NethermindP2PProvider) FetchTxReceipt(signerAddress, hash string) (*pro } return &provide.TxReceipt{ - TxHash: receipt.TxHash, - ContractAddress: receipt.ContractAddress, + TxHash: receipt.TxHash.Bytes(), + ContractAddress: receipt.ContractAddress.Bytes(), GasUsed: receipt.GasUsed, - BlockHash: receipt.BlockHash, + BlockHash: receipt.BlockHash.Bytes(), BlockNumber: receipt.BlockNumber, TransactionIndex: receipt.TransactionIndex, PostState: receipt.PostState, diff --git a/network/p2p/parity.go b/network/p2p/parity.go index 73debac3..7739ef4e 100644 --- a/network/p2p/parity.go +++ b/network/p2p/parity.go @@ -135,10 +135,10 @@ func (p *ParityP2PProvider) FetchTxReceipt(signerAddress, hash string) (*provide } return &provide.TxReceipt{ - TxHash: receipt.TxHash, - ContractAddress: receipt.ContractAddress, + TxHash: receipt.TxHash.Bytes(), + ContractAddress: receipt.ContractAddress.Bytes(), GasUsed: receipt.GasUsed, - BlockHash: receipt.BlockHash, + BlockHash: receipt.BlockHash.Bytes(), BlockNumber: receipt.BlockNumber, TransactionIndex: receipt.TransactionIndex, PostState: receipt.PostState, diff --git a/network/p2p/quorum.go b/network/p2p/quorum.go index d0c0535c..d205ef6d 100644 --- a/network/p2p/quorum.go +++ b/network/p2p/quorum.go @@ -124,10 +124,10 @@ func (p *QuorumP2PProvider) FetchTxReceipt(signerAddress, hash string) (*provide } return &provide.TxReceipt{ - TxHash: receipt.TxHash, - ContractAddress: receipt.ContractAddress, + TxHash: receipt.TxHash.Bytes(), + ContractAddress: receipt.ContractAddress.Bytes(), GasUsed: receipt.GasUsed, - BlockHash: receipt.BlockHash, + BlockHash: receipt.BlockHash.Bytes(), BlockNumber: receipt.BlockNumber, TransactionIndex: receipt.TransactionIndex, PostState: receipt.PostState, diff --git a/ops/run_integration_tests_statsdaemon.sh b/ops/run_integration_tests_statsdaemon.sh new file mode 100644 index 00000000..37ef2eee --- /dev/null +++ b/ops/run_integration_tests_statsdaemon.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +docker volume prune -f +docker build -t nchain/local --no-cache . +docker-compose -f ./ops/docker-compose.yml up -d +TAGS=$LOCAL_TAGS ./ops/run_local_tests_statsdaemon.sh +docker-compose -f ./ops/docker-compose.yml down +docker volume rm ops_provide-db diff --git a/ops/run_local_tests_statsdaemon.sh b/ops/run_local_tests_statsdaemon.sh new file mode 100644 index 00000000..42874c01 --- /dev/null +++ b/ops/run_local_tests_statsdaemon.sh @@ -0,0 +1,308 @@ +#!/bin/bash + +set -e +echo "" > coverage.txt + +if [[ -z "${DATABASE_NAME}" ]]; then + DATABASE_NAME=nchain_dev +fi + +if [[ -z "${DATABASE_USER}" ]]; then + DATABASE_USER=nchain +fi + +if [[ -z "${DATABASE_PASSWORD}" ]]; then + DATABASE_PASSWORD=nchain +fi + +if [[ -z "${NATS_SERVER_PORT}" ]]; then + NATS_SERVER_PORT=4221 +fi + +if [[ -z "${NATS_STREAMING_SERVER_PORT}" ]]; then + NATS_STREAMING_SERVER_PORT=4222 +fi + +if [[ -z "${NATS_STREAMING_CONCURRENCY}" ]]; then + NATS_STREAMING_CONCURRENCY=1 +fi + +if [[ -z "${NATS_CONCURRENCY}" ]]; then + NATS_CONCURRENCY=1 +fi + +if [[ -z "${REDIS_SERVER_PORT}" ]]; then + REDIS_SERVER_PORT=6379 +fi + +if [[ -z "${RACE}" ]]; then + RACE=true +fi + +if [[ -z "${TAGS}" ]]; then + TAGS=unit +fi + +#dropdb $DATABASE_NAME || true >/dev/null +#dropuser $DATABASE_USER || true >/dev/null + +#PGPASSWORD=$DATABASE_SUPERUSER_PASSWORD dropdb -U $DATABASE_SUPERUSER -h 0.0.0.0 -p $DATABASE_PORT $DATABASE_NAME || true >/dev/null +#PGPASSWORD=$DATABASE_SUPERUSER_PASSWORD dropuser -U $DATABASE_SUPERUSER -h 0.0.0.0 -p $DATABASE_PORT $DATABASE_USER || true >/dev/null + +DATABASE_USER=$DATABASE_USER DATABASE_PASSWORD=$DATABASE_PASSWORD DATABASE_NAME=$DATABASE_NAME make migrate + +PGP_PUBLIC_KEY='-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQINBF1Db+IBEAC0nRf3s6rls6jhWeWWTAJY8Nn4+qPUbSu0ZOx1DAqOHHxYAek1 +TOuogsXaFPRtRL5mO+0aRIDjqo6GKp9IC8k6XFlJ/+LU1C09O5XOkbzhVoHtTHOY +dvLY1N3Pw5tzemFnbjMVrbTcuLgVAZoW9+e1GTUJT/VUL6AVYhg51U3r8sOuiUX5 +wJrpGF4dhtOUc6pv3aBuG/iqA7vrJ8lME/3kdUZIMcs+StqJBxBuk/GykPAp5de9 +vofqVd8h1aZKBjHCcdDvGDK2bLqyVk+0lE8zoh/2HG+52y/dqdVt6VEsRuf96Cou +pGeftbXKkgHv4pf0ySrNoXr3bkZmuf+SJyfF+hBq34G4zGVdT3IYH5Dwsd+ScQ72 +KVI9XuO3sny+TUSWIXjWFTpQ0mhtjMhdHngXERBcgmdaS5JfmgGev3l0tqOBWhXA +oObRQ8oPhWhF20sM7LHWZSqbf4GGiVShCK6RxlRm2Uwhl1Fjx+1ThtKkq+JUgPrs +hCtk0CZVXlKIbJjrvRhJ7x/fjDEyfXur4wscHrGJr45M+3ts1dRhKUxKpNl8k9p1 +RXEEntNcsV0FAZz4B0l7ImGVOKK9mdlcRLVMZ37NC5QeEoOcglHB3wGpMMmXcZU1 +ZlRkQt0M/FE/PU4pKXtqiyGUZP/EFzY2O+adZZCXAbgmdbC+ktsnQCWpCwARAQAB +tCpwcm92aWRlLWRldiA8ZW5naW5lZXJpbmdAcHJvdmlkZS5zZXJ2aWNlcz6JAk4E +EwEIADgWIQS8yJWwHRuPxwaoDKhhrI+FPBEK6AUCXUNv4gIbAwULCQgHAgYVCgkI +CwIEFgIDAQIeAQIXgAAKCRBhrI+FPBEK6MHrEACm8uJ2Xc20vnXmJCuMqL3KsR75 +JKcGJw8G6z3EpRjV8FeZTfOpjO/joe+X7HrCgKq8RTfnoduApEYY7Jut5Cqlw9VF +ImQMfYUBOjMzrfbkBMngjd4P7FcFAOL+amgYP85whSoKZL1EdJkpiScM16i4rvAv +LHC8BzLS8XrkF52uMV4uaFDlgI1VVhm/Q0U/9g/WJBHXXugEpnuttT3TT0rD8BLY +bIxRdsli8M0N+c2BBfISA5kNUl2j7MEqhKPuDXWHHRBkDxKwrk/mROjJWexOtUTl +GR+WFPs2W4ikhhX51zPUCYnrhm/WFrjy+xwNveaNGrk4pr3Hm6YEGnTD5H71t4vW +ezlpbfA7aPLq6/HX30JsLUpFyl/PTE4BAAizuu04HbnRHlNBsITa7pbf2MMN4tSe +3uxcHql8BDmm3RSSK2N1vwMszySPmapUVKi3W6qduTBgT9WCd77mt20259Lyurqu +s7UF0CvrHfxGeyrMkXbv13xwG+wqWpI2hmiOTkEg2zhxap16sWhQVNJP/r1sqqUa +BwQ8cGI62hEwd9rF49etznl4TmzqUvQCx+VEnbArT/7nUKHmGPXAcI7g0hMJj0N6 +LQXF5w8xlHFXOfzpDNN7YzobARgiWC41NezPZKhv4y8Cy2riUsju5qRyIN37Ipy1 +rE2pI5SJ+jU80Z+G2bkCDQRdQ2/iARAArsi1Zuqv9raZWLLcAgyk1ed6KtmvWxTH +/zf6E37P2dh+Z/7pGJp6xkHUdtgaj7EABEZfzGJnyo11Pna7u8se0j1ZtxwazjS+ +B48KW9eM0K9m8eje0RkP99jg3naZGV1LnJKt2g68h5rsibfBJYg7/tYxBpqILavW +CCg9xTDsN0X9D5hQdg2pclG9OsSmK72rBbzuEkKnhY99U+2LluCIi5bX07gYuZ9G +lbWtX6IKlvPx/FDLaOqUWrtJ0WYQMb92TqimbRe6MG2vLzPIKrn8MkaSSADd9nhT +rnKD61hr4kFr+sd+iyr1nKg97ppd4dWaL88oDEeKE9xO3H391ihz0stbt2AfD9R0 +2i86UfY06lLfEukiqlwUGnvxVntjDZmqJWZBG7mLtilSC1THpifPMJ2OogSJRMJ9 +wZVelI2aK6uSrqkzW8dutKhcRmlRVDCtMdpFJncxuyLneVWKBMH8SMVenYrbrkqq +1NN/25H4Wy7A/4MCvMHoBRMQheiFBEWx+JEkWnDFeVA2ZGREmQsqncG81GN/pULW +shlCEgMU3LrKoB2wzfgBjVXjKq/+7utuwKshkEH2y6g5IPDOOHtfdXNqZ3Ph3r0l +k/KqWY56csJ0ytUXVYpcr1u7VxS/yoOQQBkh481CSJfV7eRJ5/epEQKdxP4sPFCv +00SZv1/XyvkAEQEAAYkCNgQYAQgAIBYhBLzIlbAdG4/HBqgMqGGsj4U8EQroBQJd +Q2/iAhsMAAoJEGGsj4U8EQroTusP/jKLeLOaqXWIlZqSWR5kaYUWeEvtZWVWQLsa +kqKEIFwTSq5HBWIfyZxtPTmo3JN+78ufkYJXg0Hs/sTIWbs3juoDSgLvql2ME1M2 +a3Dgr2OPkMPnEReQ8jlcRcseJqqdl2CpFG6pQzmof8UZZ3G//GHIop6Fss29+hjg +fDAOc/YXwcrqVs0cPpXSBU+XJwH7zIvkVfRKFeoieOqK8mHhgDP6FTJ2tyR8+UDa +NsaB7rF2LJyh4UtJLImlftH/1yYWjaYxjsOqiqiTA5NPbiEKtfnK54740YVJouAq +CCoQV6vwMbvre3izUSCPcmHWdo1Lpy8MV1vrDto7biptubqM6Sli0BUlP6/EhFnY +DaKMm9iZvsS6q5mHIcmZxuqMkJ6gheMISqN4tzoGshOpE+XPRk6GgJp6GxTsEuim +MBjvqsdVq8lqp1kGWdSZDgnOPOPwvW3mIp8nEwY7b8hp/YGz6pylaQaCKeACQkfd +r82Hnp5/o36qMdudanGGvP28qbXmEan/VyKQuReBJ2JQKLUpFpCWUOaho6dcP6Zo +jNfRxN+DUEJNER0oZUTEeEno3BfRYkpQ/EZjtQ9muVh2S8UVL06OV0f5deOxicP4 +65KorIgQeczCg8iX3Pt1ZojYNW4YOnrEys22ZaI6+hmvLf9Zx4u4ip+tTIaEkJoi +96kKxfZ6 +=HwI4 +-----END PGP PUBLIC KEY BLOCK-----' + +PGP_PRIVATE_KEY='-----BEGIN PGP PRIVATE KEY BLOCK----- + +lQdGBF1Db+IBEAC0nRf3s6rls6jhWeWWTAJY8Nn4+qPUbSu0ZOx1DAqOHHxYAek1 +TOuogsXaFPRtRL5mO+0aRIDjqo6GKp9IC8k6XFlJ/+LU1C09O5XOkbzhVoHtTHOY +dvLY1N3Pw5tzemFnbjMVrbTcuLgVAZoW9+e1GTUJT/VUL6AVYhg51U3r8sOuiUX5 +wJrpGF4dhtOUc6pv3aBuG/iqA7vrJ8lME/3kdUZIMcs+StqJBxBuk/GykPAp5de9 +vofqVd8h1aZKBjHCcdDvGDK2bLqyVk+0lE8zoh/2HG+52y/dqdVt6VEsRuf96Cou +pGeftbXKkgHv4pf0ySrNoXr3bkZmuf+SJyfF+hBq34G4zGVdT3IYH5Dwsd+ScQ72 +KVI9XuO3sny+TUSWIXjWFTpQ0mhtjMhdHngXERBcgmdaS5JfmgGev3l0tqOBWhXA +oObRQ8oPhWhF20sM7LHWZSqbf4GGiVShCK6RxlRm2Uwhl1Fjx+1ThtKkq+JUgPrs +hCtk0CZVXlKIbJjrvRhJ7x/fjDEyfXur4wscHrGJr45M+3ts1dRhKUxKpNl8k9p1 +RXEEntNcsV0FAZz4B0l7ImGVOKK9mdlcRLVMZ37NC5QeEoOcglHB3wGpMMmXcZU1 +ZlRkQt0M/FE/PU4pKXtqiyGUZP/EFzY2O+adZZCXAbgmdbC+ktsnQCWpCwARAQAB +/gcDAnQZdqwxtBcf6c4goheF+3hZirCCimY2u8A8vRU89kH0evSv7yyHMikCBb+o +lf3l+iWRcPDnwYfCCdVsou0ED1o+5CZGOlzo7ZZ6xdYTzLOthMhva8lxeADXh0j1 +Pm/VPj9sl8CU7ghBK4wb9gkBBUqBUJ5Mdk+pSkmcK5Xrh8dkcseN5LF7KpQfGSO6 +1BjiRQj56uLwjwcnqLABvlp83cBkXYOQAMr3Fk8GaFVccNkvBqfGO7U3Fk5vxrZR +x2iZubwNMp0Erie2mj3hZqNtP4nB+iXylraNObgjTQBLYUfAsJBSg0Kg8IkqZOMM +dEJanl8w6gLi+19kk8rZpYHAdulaWT2rCRd5tAJfLjJ/amSyJYG99zofDM7CJrcV +5/3TdP/5yOoRlVOMpd2Mnu8W4G8sNJHNZPLlc6WoyHESldTGnWVODuIN5lcSuh1F +Xg1DE05utT+kDruk3GT5CWnTm5Hq8GLUFltGJ0aoob5PYjT6b6/+ZPB0vZQO3jzD +aUHjpU6bN9vwKqxbusM+94mLX1/W01CLZxVHU5mSj/iUG6iMsQdZN2xluFfzsJ2g +8t3izfMqPdoF3LFTyAjsBcZK+UOr8DkBk9wJEJAGUDZtMYZXx59cAYuG7w1dkcKI +yMRl3K0zIc+OZMUC+83xkxFpoEp2xcV/UOx9LgFsGyVnqe/MMmWsslNVopOWLL8l +7oaHN1UAujHQ3r3+inJqEpLqScu/p/xapVjsWE76vCFhGtXmpe7AzH59bzqNMuOD +WuAC3tNJeKI8/gVLrx4AHqhCw1PaY35OXL8LU4JdoEp0wB5oedJLCDac7obY0ILu +kFhJg7DtZZMSsKIshfqc0Yy3Ij6wlN2EbTO0iGDRaORJ1Sp6d3N46NfLVUfmR5MW +CsTxvvY9JAdX2DtVxGgwGkpO9irZ3G/gucnORtVweCxM24yxbgdlv8g9DQtmcvyS +/dCVNLGYbu0dNOYtxIwx0QGzqwiJKoOs2fiUKe7IPz0t1RyD/SpE/5NaanD5Ii8s +1/DiZonc4FILVUoLGoEy6uE6FSKAgULg3MgE4c19/DBSTC5Y1sACsBbyKObW4uS6 +Oxmi6X43mBuEa/1G18+0Yv6+5JlAE+EggpLqR/oIO+i0Wsg0O+l4Jmv8KPxm/FPg +vJYjKuKCGtXnPGARRNOWueQjk9RA2HG9kSvKUklI1wN6ucthS4nbGsurWS+zwtHV +6GHbjDX+RoXJTkHHIznpMGGI6E38cIe1NjxLlcWKajXtSyHXxDHkmaNxphSn9BSt +QV6TzmV/vQDDqXpUXbWjubtlfJzrikic/Sg71b1gvcPuLvLYptZ9P+9NxPx9/8ID +eMkLdHb76+6Y4va/mBLYJEqDqq7L7/6ceY/gGcUHEhH64/ItEzhnLpI03Ycp90zN +zbOv0QSZwrxBPkT2KEtYAfmIgm0U0as8fsqBE8u2okCFAkPRDSKN2vQbIkzCEj1I +Ejk9phH8qqi8R0Ti+SJLr6AFD/LR3oVV1YbgsSknjR8YB6qmeH/zNAKY+1kwOVLv +PuMWMVWzgYqrmO9Ho9ibYwVNzh9FvetvDQImLAz/jFGsst+K8EXdBuq1r3MpEhrF +IE4Nzu0/gioIfAxcgZjJDWJAn4ILwAyb+kSD1bafmmhaMyKqwoqPLbSfXc88G5Hx +ba1bBFC0njZ7qfB11/1vvQ96Z+wKyK8So3eQU2prAe8S3VN6VV52OLPAa0hg3vH5 +wGEbwRJjzu0xn/7sSvxTER8bU/Wiw0h0y+BH7tpYgJBNPRLVv93TvH20KnByb3Zp +ZGUtZGV2IDxlbmdpbmVlcmluZ0Bwcm92aWRlLnNlcnZpY2VzPokCTgQTAQgAOBYh +BLzIlbAdG4/HBqgMqGGsj4U8EQroBQJdQ2/iAhsDBQsJCAcCBhUKCQgLAgQWAgMB +Ah4BAheAAAoJEGGsj4U8EQrowesQAKby4nZdzbS+deYkK4yovcqxHvkkpwYnDwbr +PcSlGNXwV5lN86mM7+Oh75fsesKAqrxFN+eh24CkRhjsm63kKqXD1UUiZAx9hQE6 +MzOt9uQEyeCN3g/sVwUA4v5qaBg/znCFKgpkvUR0mSmJJwzXqLiu8C8scLwHMtLx +euQXna4xXi5oUOWAjVVWGb9DRT/2D9YkEdde6ASme621PdNPSsPwEthsjFF2yWLw +zQ35zYEF8hIDmQ1SXaPswSqEo+4NdYcdEGQPErCuT+ZE6MlZ7E61ROUZH5YU+zZb +iKSGFfnXM9QJieuGb9YWuPL7HA295o0auTimvcebpgQadMPkfvW3i9Z7OWlt8Dto +8urr8dffQmwtSkXKX89MTgEACLO67TgdudEeU0GwhNrult/Yww3i1J7e7FweqXwE +OabdFJIrY3W/AyzPJI+ZqlRUqLdbqp25MGBP1YJ3vua3bTbn0vK6uq6ztQXQK+sd +/EZ7KsyRdu/XfHAb7CpakjaGaI5OQSDbOHFqnXqxaFBU0k/+vWyqpRoHBDxwYjra +ETB32sXj163OeXhObOpS9ALH5USdsCtP/udQoeYY9cBwjuDSEwmPQ3otBcXnDzGU +cVc5/OkM03tjOhsBGCJYLjU17M9kqG/jLwLLauJSyO7mpHIg3fsinLWsTakjlIn6 +NTzRn4bZnQdGBF1Db+IBEACuyLVm6q/2tplYstwCDKTV53oq2a9bFMf/N/oTfs/Z +2H5n/ukYmnrGQdR22BqPsQAERl/MYmfKjXU+dru7yx7SPVm3HBrONL4Hjwpb14zQ +r2bx6N7RGQ/32ODedpkZXUuckq3aDryHmuyJt8EliDv+1jEGmogtq9YIKD3FMOw3 +Rf0PmFB2DalyUb06xKYrvasFvO4SQqeFj31T7YuW4IiLltfTuBi5n0aVta1fogqW +8/H8UMto6pRau0nRZhAxv3ZOqKZtF7owba8vM8gqufwyRpJIAN32eFOucoPrWGvi +QWv6x36LKvWcqD3uml3h1ZovzygMR4oT3E7cff3WKHPSy1u3YB8P1HTaLzpR9jTq +Ut8S6SKqXBQae/FWe2MNmaolZkEbuYu2KVILVMemJ88wnY6iBIlEwn3BlV6UjZor +q5KuqTNbx260qFxGaVFUMK0x2kUmdzG7Iud5VYoEwfxIxV6dituuSqrU03/bkfhb +LsD/gwK8wegFExCF6IUERbH4kSRacMV5UDZkZESZCyqdwbzUY3+lQtayGUISAxTc +usqgHbDN+AGNVeMqr/7u627AqyGQQfbLqDkg8M44e191c2pnc+HevSWT8qpZjnpy +wnTK1RdVilyvW7tXFL/Kg5BAGSHjzUJIl9Xt5Enn96kRAp3E/iw8UK/TRJm/X9fK ++QARAQAB/gcDAp/NuTo7+BdI6YiI8RRw2uy8ZoyKJT2D76uX9/U10Ej2MLlfWfED +h+s6M+9q3rWcLctwZ4NHgowcT+CgJ8muxcxbpfhjHtWHOipl1YArUJQCoW1Fiwyy +aktQM4KBudAm2+TwcxetSRxn6YyAZLMs0j/Ax/7/Q0pLhqpmodV42CcOXhmhMLRn +/MpHt19HfopE1RrUXlgr0jA8gtiz6vi10j93tKNgL7Va7rjgx4NbJB/MIQzI+9GW +U4xb9eOBLpv2JD65PsjJZaqfGsiJPOwBtHXBTtt/9a4nskaX442DqLXvzp6750Ot +stD2tV8ONMfiR/C4UlVijOfL4yT579AXnuMZvpYenSuTMDHmcLkARUBK/xt50C45 +AWXGXvsUXKTDrljtixaUlj8ADMeTRan3ZjoiAGWfNFAzBPzhoq+vSBsWDxE8U5zH +P+pQJEHm935HbZ30kK6FYo5BXr1ak8enSq1UGGZN6JSoHcL7KQdQPmJPxNpc7GVA +KUNrpwvOaQx39N9dfO6p4FpGhAw//YLBlssmvK/rST6Hg98nzQcflpoAI4SPXYPH +Y5TO75PpYvH5InlKNf4B0rMpvblJRnIGQ9LTHrqRZWAh6DnB2kXsHVEm3FX1tQC7 +0zMtrS5cgIFgC8v0OWLJpgV3y76szvfoWiAVq6ltIf0FG74Hgxm9Eldzc467aSKx +0u0GA0xa6FYjIwdts5dEUk00yKjzhx7aDEo7PZ2ipFfU0vDSD0sq94EZ+yqJN+LZ +xlH5s/JV1wbthoxABOGA3fVJu61O6vYFj3W63USwRgTgpvZz8+LnsligEiksC2rL +GP8WDZ96Hjl/0atS/yciDI2yyscEF/eLMqVq65Yecfx1VPxNL2f/e2g5TpudAa4l +YRexENyRagtn1jsT13qLVSK253n5GEFySTpih18gjHX2OUzOShDlg0m5smbnE1aN +eUswuIOVfX0T2GqleFNlQ0PD5DkHbR1WHIZzahi9QT8Rzqqnk7zguaejLUayaHjw +SGZ68AHP6whOlU1pbjtRlKINeKm3G9UXZfjs51EYflpeaqUUB/XWYDjcRO6P0J3w +Ul1YIp2geD5+F2gVc8HLiHD5PZThI69r2ERv/f/aS4XVezchOln/cb/eyPZGw04v ++N1NUzOhitn5vpRJGKFtioRfVXMSlYxszBAp6JUQvkEMO3rznTlnR0XzPuCrD/Zv +TsCIC1X34S+5q0A0PvZivHBwJo7zsMu7JJF1/ESQ9KOmLUpBVnETXv2ATfRP5tBs +GQ5LilYbQ8SDiGKfOo6eP1tOgksBRkXu+tXPBONn8hvPJllSycK7JqTXSckVqZvT +b/APmWPa4VyxY5oMjRnCq5vskhvd7Wi9ig1R1pJwe2jixb6fy8vCxt/lUfkkm1oZ +u5Orm6+QQKlpt2N4wZJzVuoWDkCLD4qPnILRbh5MHOx0UuXZ2S+dUkTf+O07fpjt +TsNavKHkTjQ83tGuVKZz49ozO8/QwgkZrfgmRrJgHHI38HnMyShKN9bFzVKnjHaq +dRCTj5HNpkPtzqY4V2ueacRtqVp3rZ037dUJ28jYKFatHyBm11p3PSpB2kqVLm5A +lBva7JyIpliDUZqe6EKU26PJL2/ThLz8qx435aStMjEw2YLH6UXcblk6B7CjkpfR +wBTWl8F4BLq2XaC/lmepG/OFzl7W3dYhNh6obToX3b5yS72xG75Q6U+CJnvBH5QX +q8tZ1LDtO1UIAqCYFfmdY9ywmIxIb0iD7R/ZZ5HBKxj1fU2OMVIQo3Rt5AlDGEiJ +AjYEGAEIACAWIQS8yJWwHRuPxwaoDKhhrI+FPBEK6AUCXUNv4gIbDAAKCRBhrI+F +PBEK6E7rD/4yi3izmql1iJWaklkeZGmFFnhL7WVlVkC7GpKihCBcE0quRwViH8mc +bT05qNyTfu/Ln5GCV4NB7P7EyFm7N47qA0oC76pdjBNTNmtw4K9jj5DD5xEXkPI5 +XEXLHiaqnZdgqRRuqUM5qH/FGWdxv/xhyKKehbLNvfoY4HwwDnP2F8HK6lbNHD6V +0gVPlycB+8yL5FX0ShXqInjqivJh4YAz+hUydrckfPlA2jbGge6xdiycoeFLSSyJ +pX7R/9cmFo2mMY7DqoqokwOTT24hCrX5yueO+NGFSaLgKggqEFer8DG763t4s1Eg +j3Jh1naNS6cvDFdb6w7aO24qbbm6jOkpYtAVJT+vxIRZ2A2ijJvYmb7EuquZhyHJ +mcbqjJCeoIXjCEqjeLc6BrITqRPlz0ZOhoCaehsU7BLopjAY76rHVavJaqdZBlnU +mQ4Jzjzj8L1t5iKfJxMGO2/Iaf2Bs+qcpWkGgingAkJH3a/Nh56ef6N+qjHbnWpx +hrz9vKm15hGp/1cikLkXgSdiUCi1KRaQllDmoaOnXD+maIzX0cTfg1BCTREdKGVE +xHhJ6NwX0WJKUPxGY7UPZrlYdkvFFS9OjldH+XXjsYnD+OuSqKyIEHnMwoPIl9z7 +dWaI2DVuGDp6xMrNtmWiOvoZry3/WceLuIqfrUyGhJCaIvepCsX2eg== +=1A2t +-----END PGP PRIVATE KEY BLOCK-----' + +PGP_PASSPHRASE=password + +PAYMENTS_REFRESH_TOKEN='eyJhbGciOiJSUzI1NiIsImtpZCI6ImU2OmY3OmQ1OjI0OmUyOjU5OjA2OjJiOmJjOmEyOjhjOjM1OjlkOmNhOjBhOjg3IiwidHlwIjoiSldUIn0.eyJhdWQiOiJodHRwczovL2lkZW50LnByb3ZpZGUuc2VydmljZXMvYXBpL3YxIiwiaWF0IjoxNjAwNzA5NzU0LCJpc3MiOiJodHRwczovL2lkZW50LnByb3ZpZGUuc2VydmljZXMiLCJqdGkiOiJjN2I2YzI2ZS00OTkwLTQ4YWYtYmMwYy05YWRiY2E5ZmRmNzYiLCJuYXRzIjp7InBlcm1pc3Npb25zIjp7InN1YnNjcmliZSI6eyJhbGxvdyI6WyJhcHBsaWNhdGlvbi4wMTU1NGUyMi0zZDdhLTQ0YTMtOWM2NS02YmNhYmFhMDhjMzgiLCJuZXR3b3JrLiouY29ubmVjdG9yLioiLCJuZXR3b3JrLiouc3RhdHVzIiwicGxhdGZvcm0uXHUwMDNlIl19fX0sInBydmQiOnsiYXBwbGljYXRpb25faWQiOiIwMTU1NGUyMi0zZDdhLTQ0YTMtOWM2NS02YmNhYmFhMDhjMzgiLCJleHRlbmRlZCI6eyJwZXJtaXNzaW9ucyI6eyIqIjo1MTB9fSwicGVybWlzc2lvbnMiOjUxMH0sInN1YiI6ImFwcGxpY2F0aW9uOjAxNTU0ZTIyLTNkN2EtNDRhMy05YzY1LTZiY2FiYWEwOGMzOCJ9.iPYYSS0hHNYLUXcgpBfQbo6goMGDHF5Oxv1OvkB-WAzRgZSAm2HFroOUsmPlCQwO5eNeTfMqRaQMDdl6idTCip99y-zYTu8ys7dahyk4P1lhh4BB8vTCl3AHQuyUTGloMrY2JytpkmXMZTsxu-UhQxaaQN0IlSotSIFAYPT3jHH5nYy2MJbcfxePt8xKmXzwvpjTEVJRmUfAfEXjJF34S3hAuw9S7WncKucZfuP1WwP65h53HbLB69DR6KFZ76eiRavke5RpT40r9UKC6zPP-UZhTAuWQjOSmBhkd_IUg4T2a8r4W9CJT6aLgtwE0i1OUrPDVj_EzQV9tsjlwIOv5y9r_p-sfdxXdHFfoT8nAs5uIcWTw45J2Ycc0b4vqs-sYDr2qn7TS5DvJbPQSnRBS9YZ8CJq9mFpc5GjunCzEqO6JkvEWaN1mqPJbcvMGmLRQt5zA-2D0fFq1mvIUCUcg3EQ5J5lAZqudGf9mnYf4xRIMacCssF5VsP36xXg7pnscqh3u3JdQ-Fon3nB5vbIXn2fxaJjYl4ggNr-IgLxK7_h9KlDkiv7I7EKWGl2Np0q3-mVvuTIk7M-GqT3Dx9TtpR6MsK6EX0frUH3bZH8RHBHnxx67oxNMamviT-XUNudUU7Wan1PfnaPSsqfrn6OT5Abep-BbewKJn3ErY0Z-oU' +NCHAIN_API_HOST='localhost:8080' +NCHAIN_API_PATH='api/v1' +NCHAIN_API_SCHEME='http' +IDENT_API_HOST='localhost:8081' +IDENT_API_PATH='api/v1' +IDENT_API_SCHEME='http' +VAULT_API_HOST='localhost:8082' +VAULT_API_PATH='api/v1' +VAULT_API_SCHEME='http' +VAULT_SEAL_UNSEAL_KEY='traffic charge swing glimpse will citizen push mutual embrace volcano siege identify gossip battle casual exit enrich unlock muscle vast female initial please day' +VAULT_REFRESH_TOKEN='eyJhbGciOiJSUzI1NiIsImtpZCI6IjEwOjJlOmQ5OmUxOmI4OmEyOjM0OjM3Ojk5OjNhOjI0OmZjOmFhOmQxOmM4OjU5IiwidHlwIjoiSldUIn0.eyJhdWQiOiJodHRwczovL3Byb3ZpZGUuc2VydmljZXMvYXBpL3YxIiwiaWF0IjoxNjA1NzkxMjQ4LCJpc3MiOiJodHRwczovL2lkZW50LnByb3ZpZGUuc2VydmljZXMiLCJqdGkiOiI5YjUxNGIxNS01NTdlLTRhYWQtYTcwOC0wMTcwZTAwZWE1ZmIiLCJuYXRzIjp7InBlcm1pc3Npb25zIjp7InN1YnNjcmliZSI6eyJhbGxvdyI6WyJhcHBsaWNhdGlvbi4zNjAxNTdmOC1kNWExLTQ0NDAtOTE4Yi1mNjhiYjM5YzBkODAiLCJ1c2VyLjIzY2MwN2UwLTM4NTEtNDBkZC1iNjc1LWRmNzY4MDY3MmY3ZCIsIm5ldHdvcmsuKi5jb25uZWN0b3IuKiIsIm5ldHdvcmsuKi5zdGF0dXMiLCJwbGF0Zm9ybS5cdTAwM2UiXX19fSwicHJ2ZCI6eyJhcHBsaWNhdGlvbl9pZCI6IjM2MDE1N2Y4LWQ1YTEtNDQ0MC05MThiLWY2OGJiMzljMGQ4MCIsImV4dGVuZGVkIjp7InBlcm1pc3Npb25zIjp7IioiOjUxMH19LCJwZXJtaXNzaW9ucyI6NTEwLCJ1c2VyX2lkIjoiMjNjYzA3ZTAtMzg1MS00MGRkLWI2NzUtZGY3NjgwNjcyZjdkIn0sInN1YiI6ImFwcGxpY2F0aW9uOjM2MDE1N2Y4LWQ1YTEtNDQ0MC05MThiLWY2OGJiMzljMGQ4MCJ9.SUh84MKBNstdu3KFu1zEAQq03xbPw1D0lLXeogz1HfBJy77bIGf7HLvCuc6bjkh0xj3cEuEus1dC1Dj3BvlZoSXsvz_biTzSapkXzJjpkwOL6qkYDmqTPZvXwqmk-mUNrHTPkqdiIJL7xA46tzHW3E_hjSA9HjEk1kXjPdJQ6_ifkgWNoAaSD--kudIrhZ7vLnfy0H1JEAOsXzSAMoc5_pNG2n79m0ywvb_4l9BqdsHW8N3xSQOFjcp9gD_tqo6ffug3pkpoy-RSguM_OaMR2lj_CHhYxAt0phtjUceDD3K1h5iZ38kSl7izhOdULMmGBhVpBMoSy6_R6ZzpCL3pj8FcReX9RXR5oYpm8PDtlmWqblQzjwY00-uYLfOX0_iS4MGfEsjadZPfTmJLcOTYC7H4PL9ZRu_XtMDUrGBQQz5b_ad2ZzMXbBNeU6vbxVKDG8VFKWOHAemqHTcvuOAsOCLIqOu-eJpZHlXbx-FXPTYledd-GBDe7IjaC9ll_JK3utCOnCq0qUs6lnXIrQ_Sp1LcTKJJ7aY5f9TxeoAuL-ghDbQ3Xkw6huKyPCz2evOwVLwrB9ZRMlQXgmTnB1OeQvWii1WbmkyV1Zhbz_RPB8ckK7_mFxuPvsXK8wTFiWFmj96sRX470kV-ooSfM5CzKZhSLqgyyaUNC0VaCPq0uuE' + +#pkgs=(bridge common connector consumer contract db filter network oracle prices token tx wallet) +pkgs=("cmd/statsdaemon") + +for d in "${pkgs[@]}" ; do + pkg=$(echo $d | sed 's/\/*$//g') + + if [ "$RACE" = "true" ]; then + PGP_PUBLIC_KEY=${PGP_PUBLIC_KEY} \ + PGP_PRIVATE_KEY=$PGP_PRIVATE_KEY \ + PGP_PASSPHRASE=$PGP_PASSPHRASE \ + NATS_TOKEN=testtoken \ + NATS_URL=nats://localhost:${NATS_SERVER_PORT} \ + NATS_STREAMING_URL=nats://localhost:${NATS_STREAMING_SERVER_PORT} \ + NATS_CLUSTER_ID=provide \ + NATS_STREAMING_URL=$NATS_STREAMING_URL \ + NATS_STREAMING_CONCURRENCY=$NATS_STREAMING_CONCURRENCY \ + NATS_CONCURRENCY=$NATS_CONCURRENCY \ + GIN_MODE=release \ + DATABASE_HOST=localhost \ + DATABASE_NAME=${DATABASE_NAME} \ + DATABASE_USER=${DATABASE_USER} \ + DATABASE_PASSWORD=${DATABASE_PASSWORD} \ + LOG_LEVEL=DEBUG \ + REDIS_HOSTS=localhost:${REDIS_SERVER_PORT} \ + TEST_AWS_ACCESS_KEY_ID=${TEST_AWS_ACCESS_KEY_ID} \ + TEST_AWS_SECRET_ACCESS_KEY=${TEST_AWS_SECRET_ACCESS_KEY} \ + PAYMENTS_REFRESH_TOKEN=${PAYMENTS_REFRESH_TOKEN} \ + IDENT_API_HOST=${IDENT_API_HOST} \ + IDENT_API_SCHEME=${IDENT_API_SCHEME} \ + VAULT_API_HOST=${VAULT_API_HOST} \ + VAULT_API_SCHEME=${VAULT_API_SCHEME} \ + NCHAIN_API_HOST=${NCHAIN_API_HOST} \ + NCHAIN_API_SCHEME=http \ + VAULT_SEAL_UNSEAL_KEY=${VAULT_SEAL_UNSEAL_KEY} \ + VAULT_REFRESH_TOKEN=${VAULT_REFRESH_TOKEN} \ + go test "./${pkg}" -v \ + -race \ + -timeout 1800s \ + -cover \ + -coverpkg="./${pkg}" \ + -coverprofile=profile.out \ + -tags="$TAGS" + else + PGP_PUBLIC_KEY=$PGP_PUBLIC_KEY \ + PGP_PRIVATE_KEY=$PGP_PRIVATE_KEY \ + PGP_PASSPHRASE=$PGP_PASSPHRASE \ + NATS_TOKEN=testtoken \ + NATS_URL=nats://localhost:${NATS_SERVER_PORT} \ + NATS_STREAMING_URL=nats://localhost:${NATS_STREAMING_SERVER_PORT} \ + NATS_CLUSTER_ID=provide \ + NATS_STREAMING_URL=$NATS_STREAMING_URL \ + NATS_STREAMING_CONCURRENCY=$NATS_STREAMING_CONCURRENCY \ + NATS_CONCURRENCY=$NATS_CONCURRENCY \ + GIN_MODE=release \ + DATABASE_HOST=localhost \ + DATABASE_NAME=${DATABASE_NAME} \ + DATABASE_USER=${DATABASE_USER} \ + DATABASE_PASSWORD=${DATABASE_PASSWORD} \ + LOG_LEVEL=DEBUG \ + REDIS_HOSTS=localhost:${REDIS_SERVER_PORT} \ + TEST_AWS_ACCESS_KEY_ID=${TEST_AWS_ACCESS_KEY_ID} \ + TEST_AWS_SECRET_ACCESS_KEY=${TEST_AWS_SECRET_ACCESS_KEY} \ + PAYMENTS_REFRESH_TOKEN=${PAYMENTS_REFRESH_TOKEN} \ + IDENT_API_HOST=${IDENT_API_HOST} \ + IDENT_API_SCHEME=${IDENT_API_SCHEME} \ + #VAULT_API_HOST=${VAULT_API_HOST} \ + #VAULT_API_SCHEME=${VAULT_API_SCHEME} \ + #NCHAIN_API_HOST=${NCHAIN_API_HOST} \ + #NCHAIN_API_SCHEME=${NCHAIN_API_SCHEME} \ + #VAULT_REFRESH_TOKEN=${VAULT_REFRESH_TOKEN} \ + #VAULT_SEAL_UNSEAL_KEY=${VAULT_SEAL_UNSEAL_KEY} \ + go test "./${pkg}" -v \ + -timeout 1800s \ + -cover \ + -coverpkg="./${pkg}" \ + -coverprofile=profile.out \ + -tags="$TAGS" + fi +done diff --git a/tx/consumer.go b/tx/consumer.go index 0d977c2d..b07aadb8 100644 --- a/tx/consumer.go +++ b/tx/consumer.go @@ -14,7 +14,7 @@ import ( stan "github.com/nats-io/stan.go" "github.com/provideplatform/nchain/common" "github.com/provideplatform/nchain/contract" - api "github.com/provideplatform/provide-go/api" + "github.com/provideplatform/provide-go/api" provide "github.com/provideplatform/provide-go/api/nchain" ) @@ -156,7 +156,7 @@ func consumeTxCreateMsg(msg *stan.Msg) { // processTxTransferMsg processes ETH value transfer NATS messages func processTxTransferMsg(msg *stan.Msg) { - common.Log.Debugf("Consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) + common.Log.Debugf("consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) db := dbconf.DatabaseConnection() @@ -164,7 +164,7 @@ func processTxTransferMsg(msg *stan.Msg) { // TODO alter this to support transfers tx, _, err := processNATSTxMsg(msg, db, false) if err != nil { - common.Log.Debugf("Error processing NATS %v message. Error: %s", msg.Subject, err.Error()) + common.Log.Debugf("error processing NATS %v message. error: %s", msg.Subject, err.Error()) natsutil.Nack(msg) return } @@ -172,7 +172,7 @@ func processTxTransferMsg(msg *stan.Msg) { common.Log.Debugf("ACK: Acking previously processed NATS msg seq: %v", msg.Sequence) err := msg.Ack() if err != nil { - common.Log.Debugf("ACK: Error acking previously processed NATS msg seq: %v. Error: %s", msg.Sequence, err.Error()) + common.Log.Debugf("ACK: error acking previously processed NATS msg seq: %v. error: %s", msg.Sequence, err.Error()) natsutil.AttemptNack(msg, txCreateMsgTimeout) } return @@ -185,12 +185,12 @@ func processTxTransferMsg(msg *stan.Msg) { common.Log.Debugf("ConsumeTxExecutionMsg, executed tx with ref: %s", *tx.Ref) } else { - errmsg := fmt.Sprintf("Failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) + errmsg := fmt.Sprintf("failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) for _, err := range tx.Errors { errmsg = fmt.Sprintf("%s\n\t%s", errmsg, *err.Message) } - common.Log.Debugf("Tx ref %s failed. Error: %s, Attempting nacking", *tx.Ref, errmsg) + common.Log.Debugf("Tx ref %s failed. error: %s, Attempting nacking", *tx.Ref, errmsg) natsutil.AttemptNack(msg, txMsgTimeout) } @@ -203,14 +203,14 @@ func processTxTransferMsg(msg *stan.Msg) { // it joins an ordered processing queue func processTxCreateMsg(msg *stan.Msg) { - common.Log.Debugf("Consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) + common.Log.Debugf("consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) db := dbconf.DatabaseConnection() // process msg data tx, contract, err := processNATSTxMsg(msg, db, true) if err != nil { - common.Log.Debugf("Error processing NATS %v message. Error: %s", msg.Subject, err.Error()) + common.Log.Debugf("error processing NATS %v message. error: %s", msg.Subject, err.Error()) natsutil.Nack(msg) return } @@ -218,8 +218,8 @@ func processTxCreateMsg(msg *stan.Msg) { common.Log.Debugf("ACK: Acking previously processed NATS msg seq: %v", msg.Sequence) err := msg.Ack() if err != nil { - common.Log.Debugf("ACK: Error acking previously processed NATS msg seq: %v. Error: %s", msg.Sequence, err.Error()) - //common.Log.Debugf("ACK: Error acking tx ref %s. Error: %s", *tx.Ref, err.Error()) + common.Log.Debugf("ACK: error acking previously processed NATS msg seq: %v. error: %s", msg.Sequence, err.Error()) + //common.Log.Debugf("ACK: error acking tx ref %s. error: %s", *tx.Ref, err.Error()) natsutil.AttemptNack(msg, txCreateMsgTimeout) } return @@ -238,12 +238,12 @@ func processTxCreateMsg(msg *stan.Msg) { } else { - errmsg := fmt.Sprintf("Failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) + errmsg := fmt.Sprintf("failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) for _, err := range tx.Errors { errmsg = fmt.Sprintf("%s\n\t%s", errmsg, *err.Message) } - common.Log.Debugf("Tx ref %s failed. Error: %s, Attempting nacking", *tx.Ref, errmsg) + common.Log.Debugf("Tx ref %s failed. error: %s, Attempting nacking", *tx.Ref, errmsg) natsutil.AttemptNack(msg, txCreateMsgTimeout) } @@ -257,8 +257,8 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to unmarshal tx creation message; %s", err.Error()) - err := fmt.Errorf("error unmarshaling tx creation message from NATS %v message. Error: %s", msg.Subject, err.Error()) + common.Log.Warningf("failed to unmarshal tx creation message; %s", err.Error()) + err := fmt.Errorf("error unmarshaling tx creation message from NATS %v message. error: %s", msg.Subject, err.Error()) return nil, nil, err } @@ -287,8 +287,8 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio // no reference provided with the contract, so we'll make one reference, err = uuid.NewV4() if err != nil { - common.Log.Warningf("Failed to create unique tx ref. Error: %s", err.Error()) - err = fmt.Errorf("error creating unique ref for tx. Error: %s", err.Error()) + common.Log.Warningf("failed to create unique tx ref. error: %s", err.Error()) + err = fmt.Errorf("error creating unique ref for tx. error: %s", err.Error()) return nil, nil, err } } @@ -302,32 +302,32 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio } if !contractIDOk { - common.Log.Warningf("Failed to unmarshal contract_id during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal contract_id during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling contract_id from NATS %v message", msg.Subject) return nil, nil, err } if !dataOk { - common.Log.Warningf("Failed to unmarshal data during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal data during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling data from NATS %v message", msg.Subject) return nil, nil, err } if !accountIDStrOk && !walletIDStrOk { - common.Log.Warningf("Failed to unmarshal account_id or wallet_id during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal account_id or wallet_id during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling both accountID and walletID from NATS %v message", msg.Subject) return nil, nil, err } if !valueOk { - common.Log.Warningf("Failed to unmarshal value during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal value during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling value from NATS %v message", msg.Subject) return nil, nil, err } if !paramsOk { - common.Log.Warningf("Failed to unmarshal params during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal params during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling params from NATS %v message", msg.Subject) return nil, nil, err } if !publishedAtOk { - common.Log.Warningf("Failed to unmarshal published_at during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal published_at during NATS %v message handling", msg.Subject) err := fmt.Errorf("error unmarshaling published_at from NATS %v message", msg.Subject) return nil, nil, err } @@ -349,7 +349,7 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio } if accountID == nil && walletID == nil { - common.Log.Warningf("Failed to unmarshal account_id or wallet_id during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal account_id or wallet_id during NATS %v message handling", msg.Subject) err := fmt.Errorf("error converting accountID and walletID to uuid from NATS %v message", msg.Subject) return nil, nil, err } @@ -374,7 +374,7 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio publishedAtTime, err := time.Parse(time.RFC3339, publishedAt) if err != nil { - common.Log.Warningf("Failed to parse published_at as RFC3339 timestamp during NATS %v message handling; %s", msg.Subject, err.Error()) + common.Log.Warningf("failed to parse published_at as RFC3339 timestamp during NATS %v message handling; %s", msg.Subject, err.Error()) err := fmt.Errorf("error parsing published_at time from NATS %v message", msg.Subject) return nil, nil, err } @@ -382,7 +382,7 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio // convert value to float64 (shouldn't this be uint64?) if it's present valueFloat, valueFloatOk := value.(float64) if !valueFloatOk { - common.Log.Warningf("Failed to unmarshal value during NATS %v message handling", msg.Subject) + common.Log.Warningf("failed to unmarshal value during NATS %v message handling", msg.Subject) err := fmt.Errorf("error converting value to float64 from NATS %v message", msg.Subject) return nil, nil, err } @@ -450,7 +450,7 @@ func processNATSTxMsg(msg *stan.Msg, db *gorm.DB, newContract bool) (*Transactio // replaces this tx with the db tx (if it exists and matches) replacementTx, err := tx.replaceWithDatabaseTxIfExists(db, ref) if err != nil { - common.Log.Debugf("Error retrieving tx ref %s from database", *tx.Ref) + common.Log.Debugf("error retrieving tx ref %s from database", *tx.Ref) return nil, nil, err } @@ -552,48 +552,44 @@ func (tx *Transaction) replaceWithDatabaseTxIfExists(db *gorm.DB, ref string) (* func consumeTxExecutionMsg(msg *stan.Msg) { start := time.Now() - common.Log.Debugf("TIMINGNANO: about to process nats sequence %v", msg.Sequence) + common.Log.Tracef("processing tx message; sequence: %v", msg.Sequence) processTxExecutionMsg(msg) - elapsedTime := time.Since(start) - common.Log.Debugf("TIMINGNANO: processed nats sequence %v in %s", msg.Sequence, elapsedTime) + common.Log.Tracef("processed tx message; sequence: %v in %s", msg.Sequence, time.Since(start)) } func processTxExecutionMsg(msg *stan.Msg) { - - common.Log.Debugf("Consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) - + common.Log.Debugf("consuming %d-byte NATS tx message on subject: %s", msg.Size(), msg.Subject) db := dbconf.DatabaseConnection() // process msg data tx, _, err := processNATSTxMsg(msg, db, false) if err != nil { - common.Log.Debugf("Error processing NATS %v message. Error: %s", msg.Subject, err.Error()) + common.Log.Debugf("error processing NATS %v message. error: %s", msg.Subject, err.Error()) natsutil.Nack(msg) return } if tx == nil { - common.Log.Debugf("ACK: Acking previously processed NATS msg seq: %v", msg.Sequence) + common.Log.Debugf("ACK: acking previously processed NATS msg seq: %v", msg.Sequence) err := msg.Ack() if err != nil { - common.Log.Debugf("ACK: Error acking previously processed NATS msg seq: %v. Error: %s", msg.Sequence, err.Error()) + common.Log.Debugf("ACK: error acking previously processed NATS msg seq: %v. error: %s", msg.Sequence, err.Error()) natsutil.AttemptNack(msg, txCreateMsgTimeout) } return } - common.Log.Debugf("ConsumeTxExecutionMsg, about to execute tx with ref: %s", *tx.Ref) + common.Log.Tracef("attempting to execute tx with ref: %s", *tx.Ref) // check if we have this tx ref in the database if tx.Create(db) { common.Log.Debugf("ConsumeTxExecutionMsg, executed tx with ref: %s", *tx.Ref) } else { - - errmsg := fmt.Sprintf("Failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) + errmsg := fmt.Sprintf("failed to execute transaction; tx ref %s failed with %d error(s)", *tx.Ref, len(tx.Errors)) for _, err := range tx.Errors { errmsg = fmt.Sprintf("%s\n\t%s", errmsg, *err.Message) } - common.Log.Debugf("Tx ref %s failed. Error: %s, Attempting nacking", *tx.Ref, errmsg) + common.Log.Debugf("Tx ref %s failed. error: %s, Attempting nacking", *tx.Ref, errmsg) natsutil.AttemptNack(msg, txMsgTimeout) } @@ -603,13 +599,13 @@ func processTxExecutionMsg(msg *stan.Msg) { // TODO: consider batching this using a buffered channel for high-volume networks func consumeTxFinalizeMsg(msg *stan.Msg) { - common.Log.Tracef("Consuming NATS tx finalize message: %s", msg) + common.Log.Tracef("consuming NATS tx finalize message: %s", msg) var params map[string]interface{} nack := func(msg *stan.Msg, errmsg string, dropPacket bool) { if dropPacket { - common.Log.Tracef("Dropping tx packet (seq: %d) on the floor; %s", msg.Sequence, errmsg) + common.Log.Tracef("dropping tx packet on the floor; seq: %d; %s", msg.Sequence, errmsg) msg.Ack() return } @@ -618,7 +614,7 @@ func consumeTxFinalizeMsg(msg *stan.Msg) { err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - nack(msg, fmt.Sprintf("Failed to unmarshal tx finalize message; %s", err.Error()), true) + nack(msg, fmt.Sprintf("failed to unmarshal tx finalize message; %s", err.Error()), true) return } @@ -628,32 +624,32 @@ func consumeTxFinalizeMsg(msg *stan.Msg) { hash, hashOk := params["hash"].(string) if !blockOk { - nack(msg, "Failed to finalize tx; no block provided", true) + nack(msg, "failed to finalize tx; no block provided", true) return } if !blockTimestampStrOk { - nack(msg, "Failed to finalize tx; no block timestamp provided", true) + nack(msg, "failed to finalize tx; no block timestamp provided", true) return } if !finalizedAtStrOk { - nack(msg, "Failed to finalize tx; no finalized at timestamp provided", true) + nack(msg, "failed to finalize tx; no finalized at timestamp provided", true) return } if !hashOk { - nack(msg, "Failed to finalize tx; no hash provided", true) + nack(msg, "failed to finalize tx; no hash provided", true) return } blockTimestamp, err := time.Parse(time.RFC3339, blockTimestampStr) if err != nil { - nack(msg, fmt.Sprintf("Failed to unmarshal block_timestamp during NATS %v message handling; %s", msg.Subject, err.Error()), true) + nack(msg, fmt.Sprintf("failed to unmarshal block_timestamp during NATS %v message handling; %s", msg.Subject, err.Error()), true) return } finalizedAt, err := time.Parse(time.RFC3339, finalizedAtStr) if err != nil { - nack(msg, fmt.Sprintf("Failed to unmarshal finalized_at during NATS %v message handling; %s", msg.Subject, err.Error()), true) + nack(msg, fmt.Sprintf("failed to unmarshal finalized_at during NATS %v message handling; %s", msg.Subject, err.Error()), true) return } @@ -663,7 +659,7 @@ func consumeTxFinalizeMsg(msg *stan.Msg) { db.Where("hash = ? AND status IN (?, ?, ?, ?)", hash, "ready", "broadcast", "pending", "failed").Find(&tx) if tx == nil || tx.ID == uuid.Nil { // TODO: this is integration point to upsert Wallet & Transaction... need to think thru performance implications & implementation details - nack(msg, fmt.Sprintf("Failed to mark block and finalized_at timestamp on tx: %s; tx not found for given hash", hash), true) + nack(msg, fmt.Sprintf("failed to mark block and finalized_at timestamp on tx: %s; tx not found for given hash", hash), true) return } @@ -694,10 +690,10 @@ func consumeTxFinalizeMsg(msg *stan.Msg) { Message: common.StringOrNil(err.Error()), }) } - common.Log.Debugf("IDEMPOTENT: error updating tx ref %s. Error: %s", *tx.Ref, err.Error()) + common.Log.Warningf("error updating tx; ref: %s; error: %s", *tx.Ref, err.Error()) } if len(tx.Errors) > 0 { - nack(msg, fmt.Sprintf("Failed to set block and finalized_at timestamp on tx: %s; error: %s", hash, *tx.Errors[0].Message), false) + nack(msg, fmt.Sprintf("failed to set block and finalized_at timestamp on tx: %s; error: %s", hash, *tx.Errors[0].Message), false) return } @@ -705,7 +701,6 @@ func consumeTxFinalizeMsg(msg *stan.Msg) { } func processTxReceipt(msg *stan.Msg, tx *Transaction, key *string, db *gorm.DB) { - signer, err := tx.signerFactory(db) if err != nil { desc := "failed to resolve tx signing account or HD wallet" @@ -718,18 +713,17 @@ func processTxReceipt(msg *stan.Msg, tx *Transaction, key *string, db *gorm.DB) address, err := signer.Address() if err != nil { // let's try processing this one again - common.Log.Debugf(fmt.Sprintf("Failed to get account address from signer for tx ref %s. Error: %s", *tx.Ref, err.Error())) + common.Log.Debugf(fmt.Sprintf("failed to get account address from signer for tx ref %s. error: %s", *tx.Ref, err.Error())) natsutil.AttemptNack(msg, txReceiptMsgTimeout) return } err = tx.fetchReceipt(db, signer.Network, *address) if err != nil { - // TODO got a panic here on *tx.hash (removed temporarily) - common.Log.Debugf(fmt.Sprintf("Failed to fetch tx receipt for tx ref %s. Error: %s", *tx.Ref, err.Error())) + common.Log.Debugf(fmt.Sprintf("failed to fetch tx receipt for tx ref %s. error: %s", *tx.Ref, err.Error())) natsutil.AttemptNack(msg, txReceiptMsgTimeout) } else { - common.Log.Debugf("Fetched tx receipt for tx ref %s, hash: %s", *tx.Ref, *tx.Hash) + common.Log.Debugf("fetched tx receipt for tx ref %s, hash: %s", *tx.Ref, *tx.Hash) blockNumber := tx.Response.Receipt.(*provide.TxReceipt).BlockNumber // if we have a block number in the receipt, and the tx has no block @@ -753,7 +747,7 @@ func processTxReceipt(msg *stan.Msg, tx *Transaction, key *string, db *gorm.DB) tx.NetworkLatency = &networkLatency } - common.Log.Debugf("*** tx hash %s finalized in block %v at %s", *tx.Hash, blockNumber, receiptFinalized.Format("Mon, 02 Jan 2006 15:04:05 MST")) + common.Log.Tracef("tx %s finalized in block %v at %s", *tx.Hash, blockNumber, receiptFinalized.Format("Mon, 02 Jan 2006 15:04:05 MST")) } tx.updateStatus(db, "success", nil) msg.Ack() @@ -761,8 +755,6 @@ func processTxReceipt(msg *stan.Msg, tx *Transaction, key *string, db *gorm.DB) } func consumeTxReceiptMsg(msg *stan.Msg) { - var key *string - defer func() { if r := recover(); r != nil { common.Log.Warningf("recovered from failed tx receipt message; %s", r) @@ -770,9 +762,10 @@ func consumeTxReceiptMsg(msg *stan.Msg) { } }() - common.Log.Debugf("consuming NATS tx receipt message: %s", msg) + common.Log.Tracef("consuming NATS tx receipt message: %s", msg) var params map[string]interface{} + var key *string err := json.Unmarshal(msg.Data, ¶ms) if err != nil { @@ -788,7 +781,7 @@ func consumeTxReceiptMsg(msg *stan.Msg) { return } - common.Log.Debugf("Consuming tx receipt for tx id: %s", transactionID) + common.Log.Debugf("consuming tx receipt for tx id: %s", transactionID) db := dbconf.DatabaseConnection() tx := &Transaction{} @@ -799,9 +792,6 @@ func consumeTxReceiptMsg(msg *stan.Msg) { return } - common.Log.Debugf("Starting processTxReceipt for tx ref: %s", *tx.Ref) - // TODO occasionally throws panic on startup - // caused by vault not being available, so the signer.address is not populated (:803-ish) - // process the receipts asynchronously + common.Log.Tracef("attempting to asynchronously process tx receipt; ref: %s", *tx.Ref) go processTxReceipt(msg, tx, key, db) } diff --git a/tx/shuttle_consumer.go b/tx/shuttle_consumer.go index de12186b..b0ec9f10 100644 --- a/tx/shuttle_consumer.go +++ b/tx/shuttle_consumer.go @@ -68,13 +68,13 @@ func createNatsShuttleContractDeployedSubject(wg *sync.WaitGroup) { } func consumeShuttleCircuitDeployedMsg(msg *stan.Msg) { - common.Log.Debugf("Consuming NATS shuttle circuit deployed message: %s", msg) + common.Log.Debugf("consuming NATS shuttle circuit deployed message: %s", msg) var params map[string]interface{} err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to umarshal shuttle circuit deployed message; %s", err.Error()) + common.Log.Warningf("failed to umarshal shuttle circuit deployed message; %s", err.Error()) natsutil.Nack(msg) return } @@ -84,13 +84,13 @@ func consumeShuttleCircuitDeployedMsg(msg *stan.Msg) { } func consumeShuttleContractDeployedMsg(msg *stan.Msg) { - common.Log.Debugf("Consuming NATS shuttle contract deployed message: %s", msg) + common.Log.Debugf("consuming NATS shuttle contract deployed message: %s", msg) var params map[string]interface{} err := json.Unmarshal(msg.Data, ¶ms) if err != nil { - common.Log.Warningf("Failed to umarshal shuttle contract deployed message; %s", err.Error()) + common.Log.Warningf("failed to umarshal shuttle contract deployed message; %s", err.Error()) natsutil.Nack(msg) return } @@ -103,31 +103,31 @@ func consumeShuttleContractDeployedMsg(msg *stan.Msg) { contractType, _ := params["type"].(string) if !addressOk { - common.Log.Warning("Failed to handle shuttle.contract.deployed message; contract address required") + common.Log.Warning("failed to handle shuttle.contract.deployed message; contract address required") natsutil.Nack(msg) return } if !byOk { - common.Log.Warning("Failed to handle shuttle.contract.deployed message; by address required") + common.Log.Warning("failed to handle shuttle.contract.deployed message; by address required") natsutil.Nack(msg) return } if !networkIDOk { - common.Log.Warning("Failed to handle shuttle.contract.deployed message; contract network_id required") + common.Log.Warning("failed to handle shuttle.contract.deployed message; contract network_id required") natsutil.Nack(msg) return } if !nameOk { - common.Log.Warning("Failed to handle shuttle.contract.deployed message; contract name required") + common.Log.Warning("failed to handle shuttle.contract.deployed message; contract name required") natsutil.Nack(msg) return } if !txHashOk { - common.Log.Warning("Failed to handle shuttle.contract.deployed message; tx hash required") + common.Log.Warning("failed to handle shuttle.contract.deployed message; tx hash required") natsutil.Nack(msg) return } @@ -137,14 +137,14 @@ func consumeShuttleContractDeployedMsg(msg *stan.Msg) { db.Where("network_id = ? AND address = ?", networkID, byAddr).Find(&cntrct) if cntrct == nil || cntrct.ID == uuid.Nil { - common.Log.Warningf("Failed to handle shuttle.contract.deployed message; contract not resolved for address: %s", byAddr) + common.Log.Warningf("failed to handle shuttle.contract.deployed message; contract not resolved for address: %s", byAddr) natsutil.AttemptNack(msg, natsShuttleContractDeployedTimeout) return } network, err := cntrct.GetNetwork() if err != nil { - common.Log.Warningf("Failed to handle shuttle.contract.deployed message; network not resolved for contract with address: %s; %s", byAddr, err.Error()) + common.Log.Warningf("failed to handle shuttle.contract.deployed message; network not resolved for contract with address: %s; %s", byAddr, err.Error()) natsutil.AttemptNack(msg, natsShuttleContractDeployedTimeout) return } @@ -156,21 +156,21 @@ func consumeShuttleContractDeployedMsg(msg *stan.Msg) { p2pAPI, err := network.P2PAPIClient() if err != nil { - common.Log.Warningf("Failed to handle shuttle.contract.deployed message; network P2P API client not resolved for contract with address: %s; %s", byAddr, err.Error()) + common.Log.Warningf("failed to handle shuttle.contract.deployed message; network P2P API client not resolved for contract with address: %s; %s", byAddr, err.Error()) natsutil.AttemptNack(msg, natsShuttleContractDeployedTimeout) return } receipt, err := p2pAPI.FetchTxReceipt(*cntrct.Address, txHash) if err != nil { - common.Log.Warningf("Failed to handle shuttle.contract.deployed message; failed to fetch tx receipt for contract with address: %s; %s", byAddr, err.Error()) + common.Log.Warningf("failed to handle shuttle.contract.deployed message; failed to fetch tx receipt for contract with address: %s; %s", byAddr, err.Error()) natsutil.AttemptNack(msg, natsShuttleContractDeployedTimeout) return } dependency := cntrct.ResolveCompiledDependencyArtifact(name) if dependency == nil { - common.Log.Warningf("Failed to handle shuttle.contract.deployed message; contract at address %s unable to resolved dependency: %s", byAddr, name) + common.Log.Warningf("failed to handle shuttle.contract.deployed message; contract at address %s unable to resolved dependency: %s", byAddr, name) natsutil.AttemptNack(msg, natsShuttleContractDeployedTimeout) return } @@ -198,6 +198,12 @@ func consumeShuttleContractDeployedMsg(msg *stan.Msg) { func(c *contract.Contract, tokenType, name string, decimals *big.Int, symbol string) (createdToken bool, tokenID uuid.UUID, errs []*provide.Error) { common.Log.Debugf("resolved %s token: %s (%v decimals); symbol: %s", *network.Name, name, decimals, symbol) + var address *string + if network.IsEthereumNetwork() { + ethCommonAddress := ethcommon.BytesToAddress(receipt.ContractAddress) + address = common.StringOrNil(ethCommonAddress.Hex()) + } + tok := &token.Token{ ApplicationID: c.ApplicationID, NetworkID: c.NetworkID, @@ -206,7 +212,7 @@ func consumeShuttleContractDeployedMsg(msg *stan.Msg) { Name: common.StringOrNil(name), Symbol: common.StringOrNil(symbol), Decimals: decimals.Uint64(), - Address: common.StringOrNil(receipt.ContractAddress.Hex()), + Address: address, } createdToken = tok.Create() diff --git a/tx/tx.go b/tx/tx.go index 35fe8089..c9a630e8 100644 --- a/tx/tx.go +++ b/tx/tx.go @@ -1346,9 +1346,14 @@ func (t *Transaction) handleTxReceipt( receipt *provideapi.TxReceipt, ) error { if t.To == nil { - common.Log.Debugf("Retrieved tx receipt for %s contract creation tx: %s; deployed contract address: %s", *network.Name, *t.Hash, receipt.ContractAddress.Hex()) + var contractAddress string + if network.IsEthereumNetwork() { + ethCommonAddress := ethcommon.BytesToAddress(receipt.ContractAddress) + contractAddress = *common.StringOrNil(ethCommonAddress.Hex()) + } + common.Log.Debugf("Retrieved tx receipt for %s contract creation tx: %s; deployed contract address: %s", *network.Name, *t.Hash, contractAddress) params := t.ParseParams() - contractName := fmt.Sprintf("Contract %s", *common.StringOrNil(receipt.ContractAddress.Hex())) + contractName := fmt.Sprintf("Contract %s", *common.StringOrNil(contractAddress)) if name, ok := params["name"].(string); ok { contractName = name } @@ -1367,7 +1372,7 @@ func (t *Transaction) handleTxReceipt( Name: common.StringOrNil(name), Symbol: common.StringOrNil(symbol), Decimals: decimals.Uint64(), - Address: common.StringOrNil(receipt.ContractAddress.Hex()), + Address: common.StringOrNil(contractAddress), } createdToken = tok.Create() @@ -1387,7 +1392,7 @@ func (t *Transaction) handleTxReceipt( NetworkID: t.NetworkID, TransactionID: &t.ID, Name: common.StringOrNil(contractName), - Address: common.StringOrNil(receipt.ContractAddress.Hex()), + Address: common.StringOrNil(contractAddress), Params: t.Params, Ref: t.Ref, } @@ -1399,7 +1404,7 @@ func (t *Transaction) handleTxReceipt( } } else { common.Log.Debugf("Using previously created contract %s for %s contract creation tx: %s, txID: %s", kontract.ID, *network.Name, *t.Hash, t.ID) - kontract.Address = common.StringOrNil(receipt.ContractAddress.Hex()) + kontract.Address = common.StringOrNil(contractAddress) db.Save(&kontract) common.Log.Debugf("Updated contract with address for txID: %s", t.ID) kontract.ResolveTokenContract(db, network, signerAddress, receipt, tokenCreateFn) @@ -1470,7 +1475,11 @@ func (t *Transaction) handleTxTraces( internalContract.ResolveTokenContract(db, network, signerAddress, receipt, func(c *contract.Contract, tokenType, name string, decimals *big.Int, symbol string) (createdToken bool, tokenID uuid.UUID, errs []*provide.Error) { common.Log.Debugf("Resolved %s token: %s (%v decimals); symbol: %s", *network.Name, name, decimals, symbol) - + var contractAddress *string + if network.IsEthereumNetwork() { + ethCommonAddress := ethcommon.BytesToAddress(receipt.ContractAddress) + contractAddress = common.StringOrNil(ethCommonAddress.Hex()) + } tok := &token.Token{ ApplicationID: c.ApplicationID, OrganizationID: c.OrganizationID, @@ -1480,7 +1489,7 @@ func (t *Transaction) handleTxTraces( Name: common.StringOrNil(name), Symbol: common.StringOrNil(symbol), Decimals: decimals.Uint64(), - Address: common.StringOrNil(receipt.ContractAddress.Hex()), + Address: contractAddress, } createdToken = tok.Create()