Skip to content

Commit

Permalink
Use transaction results stream & state to get correct state of transa…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
kacpersaw committed Dec 11, 2023
1 parent 6756311 commit c189a58
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 101 deletions.
20 changes: 11 additions & 9 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
const (
streamType_node_SyncStatus int = 1
//streamType_mesh_Layer int = 2
streamType_globalState int = 2
streamType_transactions int = 2
streamType_mesh_Malfeasance int = 3

streamType_count int = 3
Expand All @@ -34,7 +34,7 @@ type Listener interface {
OnAccounts(accounts []*types.Account)
OnReward(reward *pb.Reward)
OnMalfeasanceProof(proof *pb.MalfeasanceProof)
OnTransactionReceipt(receipt *pb.TransactionReceipt)
OnTransactionResult(res *pb.TransactionResult, state *pb.TransactionState)
GetLastLayer(parent context.Context) uint32
LayersInQueue() int
IsLayerInQueue(layer *pb.Layer) bool
Expand All @@ -51,11 +51,12 @@ type Collector struct {
db *sql2.Database
dbClient sql.DatabaseClient

nodeClient pb.NodeServiceClient
meshClient pb.MeshServiceClient
globalClient pb.GlobalStateServiceClient
debugClient pb.DebugServiceClient
smesherClient pb.SmesherServiceClient
nodeClient pb.NodeServiceClient
meshClient pb.MeshServiceClient
globalClient pb.GlobalStateServiceClient
transactionsClient pb.TransactionServiceClient
debugClient pb.DebugServiceClient
smesherClient pb.SmesherServiceClient

streams [streamType_count]bool
activeStreams int
Expand Down Expand Up @@ -110,6 +111,7 @@ func (c *Collector) Run() error {
c.nodeClient = pb.NewNodeServiceClient(publicConn)
c.meshClient = pb.NewMeshServiceClient(publicConn)
c.globalClient = pb.NewGlobalStateServiceClient(publicConn)
c.transactionsClient = pb.NewTransactionServiceClient(publicConn)
c.debugClient = pb.NewDebugServiceClient(publicConn)
c.smesherClient = pb.NewSmesherServiceClient(privateConn)

Expand All @@ -135,9 +137,9 @@ func (c *Collector) Run() error {
})

g.Go(func() error {
err := c.globalStatePump()
err := c.transactionsPump()
if err != nil {
return errors.Join(errors.New("cannot start sync global state pump"), err)
return errors.Join(errors.New("cannot start transactions pump"), err)
}
return nil
})
Expand Down
38 changes: 0 additions & 38 deletions collector/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
Expand All @@ -30,40 +29,3 @@ func (c *Collector) GetAccountState(address string) (uint64, uint64, error) {

return res.AccountWrapper.StateCurrent.Balance.Value, res.AccountWrapper.StateCurrent.Counter, nil
}

func (c *Collector) globalStatePump() error {
req := pb.GlobalStateStreamRequest{GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_REWARD) |
uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_TRANSACTION_RECEIPT) |
uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_ACCOUNT)}

log.Info("Start global state pump")
defer func() {
c.notify <- -streamType_globalState
log.Info("Stop global state pump")
}()

c.notify <- +streamType_globalState

stream, err := c.globalClient.GlobalStateStream(context.Background(), &req)
if err != nil {
log.Err(fmt.Errorf("cannot get global state account stream: %v", err))
return err
}

for {
response, err := stream.Recv()
if err == io.EOF {
return err
}
if err != nil {
log.Err(fmt.Errorf("cannot receive Global state data: %v", err))
return err
}
item := response.GetDatum()
if receipt := item.GetReceipt(); receipt != nil {
if receipt.Layer.Number > c.syncFromLayerFlag {
c.listener.OnTransactionReceipt(receipt)
}
}
}
}
57 changes: 57 additions & 0 deletions collector/txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package collector

import (
"context"
"fmt"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spacemeshos/go-spacemesh/log"
"io"
)

func (c *Collector) transactionsPump() error {
req := pb.TransactionResultsRequest{
Start: 1,
Watch: true,
}

log.Info("Start transactions pump")
defer func() {
c.notify <- -streamType_transactions
log.Info("Stop transactions pump")
}()

c.notify <- +streamType_transactions

stream, err := c.transactionsClient.StreamResults(context.Background(), &req)
if err != nil {
log.Err(fmt.Errorf("cannot get transactions stream results: %v", err))
return err
}

for {
response, err := stream.Recv()
if err == io.EOF {
return err
}
if err != nil {
log.Err(fmt.Errorf("cannot receive transaction result: %v", err))
return err
}
if response == nil {
continue
}

state, err := c.transactionsClient.TransactionsState(context.TODO(), &pb.TransactionsStateRequest{
TransactionId: []*pb.TransactionId{{Id: response.Tx.Id}},
IncludeTransactions: false,
})
if err != nil {
log.Err(fmt.Errorf("cannot receive transaction state: %v", err))
return err
}

if len(state.GetTransactionsState()) > 0 {
c.listener.OnTransactionResult(response, state.GetTransactionsState()[0])
}
}
}
59 changes: 25 additions & 34 deletions model/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,41 @@ type Transaction struct {
Sender string `json:"sender" bson:"sender"` // tx originator, should match signer inside Signature
Receiver string `json:"receiver" bson:"receiver"`
SvmData string `json:"svmData" bson:"svmData"` // svm binary data. Decode with svm-codec

Message string `json:"message" bson:"message"`
TouchedAddresses []string `json:"touchedAddresses" bson:"touchedAddresses"`
}

type TransactionReceipt struct {
Id string //nolint will fix it later
Layer uint32
Index uint32 // the index of the tx in the ordered list of txs to be executed by stf in the layer
Result int
GasUsed uint64 // gas units used by the transaction (gas price in tx)
Fee uint64 // transaction fee charged for the transaction
SvmData string // svm binary data. Decode with svm-codec
Id string //nolint will fix it later
Result int
Message string
GasUsed uint64 // gas units used by the transaction (gas price in tx)
Fee uint64 // transaction fee charged for the transaction
Layer uint32
Block string
TouchedAddresses []string
}

type TransactionService interface {
GetTransaction(ctx context.Context, txID string) (*Transaction, error)
GetTransactions(ctx context.Context, page, perPage int64) (txs []*Transaction, total int64, err error)
}

func NewTransactionReceipt(txReceipt *pb.TransactionReceipt) *TransactionReceipt {
return &TransactionReceipt{
Id: utils.BytesToHex(txReceipt.GetId().GetId()),
Result: int(txReceipt.GetResult()),
GasUsed: txReceipt.GetGasUsed(),
Fee: txReceipt.GetFee().GetValue(),
Layer: uint32(txReceipt.GetLayer().GetNumber()),
Index: txReceipt.GetIndex(),
SvmData: utils.BytesToHex(txReceipt.GetSvmData()),
func NewTransactionResult(res *pb.TransactionResult, state *pb.TransactionState, networkInfo NetworkInfo) (*Transaction, error) {
layerStart := networkInfo.GenesisTime + res.GetLayer()*networkInfo.LayerDuration
tx, err := NewTransaction(res.GetTx(), res.GetLayer(), utils.NBytesToHex(res.GetBlock(), 20), layerStart, 0)
if err != nil {
return nil, err
}

tx.State = int(state.State)
tx.Fee = res.GetFee()
tx.GasUsed = res.GetGasConsumed()
tx.Message = res.GetMessage()
tx.TouchedAddresses = res.GetTouchedAddresses()

return tx, nil
}

// NewTransaction try to parse the transaction and return a new Transaction struct.
Expand All @@ -73,7 +81,6 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp
if err != nil {
return nil, fmt.Errorf("failed to parse transaction: %w", err)
}

tx := &Transaction{
Id: utils.BytesToHex(in.GetId()),
Sender: txDecoded.GetPrincipal().String(),
Expand All @@ -82,7 +89,7 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp
Layer: layer,
Block: blockID,
BlockIndex: blockIndex,
State: int(pb.TransactionState_TRANSACTION_STATE_PROCESSED),
State: int(pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED),
Timestamp: timestamp,
MaxGas: in.GetMaxGas(),
GasPrice: txDecoded.GetGasPrice(),
Expand All @@ -99,19 +106,3 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp

return tx, nil
}

func GetTransactionStateFromResult(txResult int) int {
switch txResult {
case int(pb.TransactionReceipt_TRANSACTION_RESULT_EXECUTED):
return int(pb.TransactionState_TRANSACTION_STATE_PROCESSED)
case int(pb.TransactionReceipt_TRANSACTION_RESULT_BAD_COUNTER):
return int(pb.TransactionState_TRANSACTION_STATE_CONFLICTING)
case int(pb.TransactionReceipt_TRANSACTION_RESULT_RUNTIME_EXCEPTION):
return int(pb.TransactionState_TRANSACTION_STATE_REJECTED)
case int(pb.TransactionReceipt_TRANSACTION_RESULT_INSUFFICIENT_GAS):
return int(pb.TransactionState_TRANSACTION_STATE_INSUFFICIENT_FUNDS)
case int(pb.TransactionReceipt_TRANSACTION_RESULT_INSUFFICIENT_FUNDS):
return int(pb.TransactionState_TRANSACTION_STATE_INSUFFICIENT_FUNDS)
}
return int(pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED)
}
13 changes: 9 additions & 4 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,17 @@ func (s *Storage) OnReward(in *pb.Reward) {
s.updateEpochs() // trigger epoch stat recalculation todo: optimize this
}

func (s *Storage) OnTransactionReceipt(in *pb.TransactionReceipt) {
log.Info("OnTransactionReceipt(%+v)", in)
err := s.UpdateTransaction(context.Background(), model.NewTransactionReceipt(in))
func (s *Storage) OnTransactionResult(res *pb.TransactionResult, state *pb.TransactionState) {
log.Info("OnTransactionReceipt(%+v, %+v)", res, state)
tx, err := model.NewTransactionResult(res, state, s.NetworkInfo)
if err != nil {
log.Err(fmt.Errorf("OnTransactionResult: error %v", err))
}

err = s.SaveTransactionResult(context.Background(), tx)
//TODO: better error handling
if err != nil {
log.Err(fmt.Errorf("OnTransactionReceipt: error %v", err))
log.Err(fmt.Errorf("OnTransactionResult: error %v", err))
}
}

Expand Down
Loading

0 comments on commit c189a58

Please sign in to comment.