Skip to content

Example Go code for subscribe for block streaming and parsing txs

alexjlan edited this page May 4, 2020 · 2 revisions

package main import ( "context" "errors" "log" "github.com/Oneledger/explorer/common" "github.com/Oneledger/protocol/action" "github.com/Oneledger/protocol/action/transfer" "github.com/Oneledger/protocol/serialize" tmrpc "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" ) type Fee struct { Amount string json:"amount" Gas int64 json:"gas" } type Recipient struct { Account string json:"account" Amount string json:"amount" } type Transaction struct { Type string json:"type" BlockHeight int64 json:"blockHeight" Hash common.HexBytes json:"hash" ChainID string json:"chainID" From string json:"from" Recipients []Recipient json:"recipients,omitempty" Fee Fee json:"fee" Memo string json:"memo" } func main() { quit := make(chan string) // Oneledger fullnode rpc port tmAddress := "tcp://127.0.0.1:26600" tmClient, err := tmrpc.NewHTTP(tmAddress, "/websocket") if err != nil { // error handler } err = tmClient.OnStart() if err != nil { // error handler } log.Println("Successfully connected tendermint clients!") ctx := context.Background() blocksQuery := "tm.event = 'NewBlock'" blockEvents, err := tmClient.Subscribe(ctx, "tx-explorer-watcher", blocksQuery) if err != nil { // error handler } // Listen for new blocks go func(blockEvents <-chan ctypes.ResultEvent) { for event := range blockEvents { b, ok := event.Data.(tmtypes.EventDataNewBlock) if !ok { log.Println("Incorrect block event type in blockStream goroutine: ", b) continue } log.Println("Incoming block: ", b.Block.Height) for _, txn := range b.Block.Data.Txs { signedTx, err := ConvertFromBytes(txn) if err != nil { // error handler continue } tx, err := convertSendTx(signedTx) if err != nil { // error handler continue } tx.BlockHeight = b.Block.Height tx.Hash = txn.Hash() tx.ChainID = b.Block.ChainID log.Println("tx: ", tx) } } quit <- "Failed to get tendermint event stream" }(blockEvents) select { case reason := <-quit: panic(reason) } } func ConvertFromBytes(tx []byte) (action.SignedTx, error) { var base action.SignedTx szr := serialize.GetSerializer(serialize.NETWORK) err := szr.Deserialize(tx, &base) if err != nil { return action.SignedTx{}, err } return base, nil } func convertSendTx(base action.SignedTx) (Transaction, error) { var send transfer.Send err := serialize.GetSerializer(serialize.NETWORK).Deserialize(base.Data, &send) if err != nil { return Transaction{}, errors.New("Failed to deserialize tx data") } fee := base.Fee memo := base.Memo from := send.From to := send.To amount := send.Amount return Transaction{ Type: send.Type().String(), Fee: Fee{ Amount: fee.Price.String(), Gas: fee.Gas, }, From: from.Humanize(), Recipients: []Recipient{{Account: to.Humanize(), Amount: amount.String()}}, Memo: memo, }, nil }