From c189a5854d449626f9613ed056a60a977577a59f Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 11 Dec 2023 17:07:04 +0100 Subject: [PATCH] Use transaction results stream & state to get correct state of transaction --- collector/collector.go | 20 ++++--- collector/global.go | 38 ------------- collector/txs.go | 57 +++++++++++++++++++ model/tx.go | 59 +++++++++----------- storage/storage.go | 13 +++-- storage/tx.go | 110 ++++++++++++++++++++++++++++++++----- test/testseed/generator.go | 2 +- 7 files changed, 198 insertions(+), 101 deletions(-) create mode 100644 collector/txs.go diff --git a/collector/collector.go b/collector/collector.go index 2ce50a0..30ff9dd 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -21,7 +21,7 @@ import ( const ( streamType_node_SyncStatus int = 1 //streamType_mesh_Layer int = 2 - streamType_globalState int = 2 + streamType_transactions int = 2 streamType_mesh_Malfeasance int = 3 streamType_count int = 3 @@ -34,7 +34,7 @@ type Listener interface { OnAccounts(accounts []*types.Account) OnReward(reward *pb.Reward) OnMalfeasanceProof(proof *pb.MalfeasanceProof) - OnTransactionReceipt(receipt *pb.TransactionReceipt) + OnTransactionResult(res *pb.TransactionResult, state *pb.TransactionState) GetLastLayer(parent context.Context) uint32 LayersInQueue() int IsLayerInQueue(layer *pb.Layer) bool @@ -51,11 +51,12 @@ type Collector struct { db *sql2.Database dbClient sql.DatabaseClient - nodeClient pb.NodeServiceClient - meshClient pb.MeshServiceClient - globalClient pb.GlobalStateServiceClient - debugClient pb.DebugServiceClient - smesherClient pb.SmesherServiceClient + nodeClient pb.NodeServiceClient + meshClient pb.MeshServiceClient + globalClient pb.GlobalStateServiceClient + transactionsClient pb.TransactionServiceClient + debugClient pb.DebugServiceClient + smesherClient pb.SmesherServiceClient streams [streamType_count]bool activeStreams int @@ -110,6 +111,7 @@ func (c *Collector) Run() error { c.nodeClient = pb.NewNodeServiceClient(publicConn) c.meshClient = pb.NewMeshServiceClient(publicConn) c.globalClient = pb.NewGlobalStateServiceClient(publicConn) + c.transactionsClient = pb.NewTransactionServiceClient(publicConn) c.debugClient = pb.NewDebugServiceClient(publicConn) c.smesherClient = pb.NewSmesherServiceClient(privateConn) @@ -135,9 +137,9 @@ func (c *Collector) Run() error { }) g.Go(func() error { - err := c.globalStatePump() + err := c.transactionsPump() if err != nil { - return errors.Join(errors.New("cannot start sync global state pump"), err) + return errors.Join(errors.New("cannot start transactions pump"), err) } return nil }) diff --git a/collector/global.go b/collector/global.go index 64c4825..67227e1 100644 --- a/collector/global.go +++ b/collector/global.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" @@ -30,40 +29,3 @@ func (c *Collector) GetAccountState(address string) (uint64, uint64, error) { return res.AccountWrapper.StateCurrent.Balance.Value, res.AccountWrapper.StateCurrent.Counter, nil } - -func (c *Collector) globalStatePump() error { - req := pb.GlobalStateStreamRequest{GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_REWARD) | - uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_TRANSACTION_RECEIPT) | - uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_ACCOUNT)} - - log.Info("Start global state pump") - defer func() { - c.notify <- -streamType_globalState - log.Info("Stop global state pump") - }() - - c.notify <- +streamType_globalState - - stream, err := c.globalClient.GlobalStateStream(context.Background(), &req) - if err != nil { - log.Err(fmt.Errorf("cannot get global state account stream: %v", err)) - return err - } - - for { - response, err := stream.Recv() - if err == io.EOF { - return err - } - if err != nil { - log.Err(fmt.Errorf("cannot receive Global state data: %v", err)) - return err - } - item := response.GetDatum() - if receipt := item.GetReceipt(); receipt != nil { - if receipt.Layer.Number > c.syncFromLayerFlag { - c.listener.OnTransactionReceipt(receipt) - } - } - } -} diff --git a/collector/txs.go b/collector/txs.go new file mode 100644 index 0000000..2300e45 --- /dev/null +++ b/collector/txs.go @@ -0,0 +1,57 @@ +package collector + +import ( + "context" + "fmt" + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/spacemeshos/go-spacemesh/log" + "io" +) + +func (c *Collector) transactionsPump() error { + req := pb.TransactionResultsRequest{ + Start: 1, + Watch: true, + } + + log.Info("Start transactions pump") + defer func() { + c.notify <- -streamType_transactions + log.Info("Stop transactions pump") + }() + + c.notify <- +streamType_transactions + + stream, err := c.transactionsClient.StreamResults(context.Background(), &req) + if err != nil { + log.Err(fmt.Errorf("cannot get transactions stream results: %v", err)) + return err + } + + for { + response, err := stream.Recv() + if err == io.EOF { + return err + } + if err != nil { + log.Err(fmt.Errorf("cannot receive transaction result: %v", err)) + return err + } + if response == nil { + continue + } + + state, err := c.transactionsClient.TransactionsState(context.TODO(), &pb.TransactionsStateRequest{ + TransactionId: []*pb.TransactionId{{Id: response.Tx.Id}}, + IncludeTransactions: false, + }) + if err != nil { + log.Err(fmt.Errorf("cannot receive transaction state: %v", err)) + return err + } + + if len(state.GetTransactionsState()) > 0 { + c.listener.OnTransactionResult(response, state.GetTransactionsState()[0]) + } + } +} diff --git a/model/tx.go b/model/tx.go index 3311695..ad57956 100644 --- a/model/tx.go +++ b/model/tx.go @@ -38,16 +38,20 @@ type Transaction struct { Sender string `json:"sender" bson:"sender"` // tx originator, should match signer inside Signature Receiver string `json:"receiver" bson:"receiver"` SvmData string `json:"svmData" bson:"svmData"` // svm binary data. Decode with svm-codec + + Message string `json:"message" bson:"message"` + TouchedAddresses []string `json:"touchedAddresses" bson:"touchedAddresses"` } type TransactionReceipt struct { - Id string //nolint will fix it later - Layer uint32 - Index uint32 // the index of the tx in the ordered list of txs to be executed by stf in the layer - Result int - GasUsed uint64 // gas units used by the transaction (gas price in tx) - Fee uint64 // transaction fee charged for the transaction - SvmData string // svm binary data. Decode with svm-codec + Id string //nolint will fix it later + Result int + Message string + GasUsed uint64 // gas units used by the transaction (gas price in tx) + Fee uint64 // transaction fee charged for the transaction + Layer uint32 + Block string + TouchedAddresses []string } type TransactionService interface { @@ -55,16 +59,20 @@ type TransactionService interface { GetTransactions(ctx context.Context, page, perPage int64) (txs []*Transaction, total int64, err error) } -func NewTransactionReceipt(txReceipt *pb.TransactionReceipt) *TransactionReceipt { - return &TransactionReceipt{ - Id: utils.BytesToHex(txReceipt.GetId().GetId()), - Result: int(txReceipt.GetResult()), - GasUsed: txReceipt.GetGasUsed(), - Fee: txReceipt.GetFee().GetValue(), - Layer: uint32(txReceipt.GetLayer().GetNumber()), - Index: txReceipt.GetIndex(), - SvmData: utils.BytesToHex(txReceipt.GetSvmData()), +func NewTransactionResult(res *pb.TransactionResult, state *pb.TransactionState, networkInfo NetworkInfo) (*Transaction, error) { + layerStart := networkInfo.GenesisTime + res.GetLayer()*networkInfo.LayerDuration + tx, err := NewTransaction(res.GetTx(), res.GetLayer(), utils.NBytesToHex(res.GetBlock(), 20), layerStart, 0) + if err != nil { + return nil, err } + + tx.State = int(state.State) + tx.Fee = res.GetFee() + tx.GasUsed = res.GetGasConsumed() + tx.Message = res.GetMessage() + tx.TouchedAddresses = res.GetTouchedAddresses() + + return tx, nil } // NewTransaction try to parse the transaction and return a new Transaction struct. @@ -73,7 +81,6 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp if err != nil { return nil, fmt.Errorf("failed to parse transaction: %w", err) } - tx := &Transaction{ Id: utils.BytesToHex(in.GetId()), Sender: txDecoded.GetPrincipal().String(), @@ -82,7 +89,7 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp Layer: layer, Block: blockID, BlockIndex: blockIndex, - State: int(pb.TransactionState_TRANSACTION_STATE_PROCESSED), + State: int(pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED), Timestamp: timestamp, MaxGas: in.GetMaxGas(), GasPrice: txDecoded.GetGasPrice(), @@ -99,19 +106,3 @@ func NewTransaction(in *pb.Transaction, layer uint32, blockID string, timestamp return tx, nil } - -func GetTransactionStateFromResult(txResult int) int { - switch txResult { - case int(pb.TransactionReceipt_TRANSACTION_RESULT_EXECUTED): - return int(pb.TransactionState_TRANSACTION_STATE_PROCESSED) - case int(pb.TransactionReceipt_TRANSACTION_RESULT_BAD_COUNTER): - return int(pb.TransactionState_TRANSACTION_STATE_CONFLICTING) - case int(pb.TransactionReceipt_TRANSACTION_RESULT_RUNTIME_EXCEPTION): - return int(pb.TransactionState_TRANSACTION_STATE_REJECTED) - case int(pb.TransactionReceipt_TRANSACTION_RESULT_INSUFFICIENT_GAS): - return int(pb.TransactionState_TRANSACTION_STATE_INSUFFICIENT_FUNDS) - case int(pb.TransactionReceipt_TRANSACTION_RESULT_INSUFFICIENT_FUNDS): - return int(pb.TransactionState_TRANSACTION_STATE_INSUFFICIENT_FUNDS) - } - return int(pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED) -} diff --git a/storage/storage.go b/storage/storage.go index 8097dff..4a05a68 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -256,12 +256,17 @@ func (s *Storage) OnReward(in *pb.Reward) { s.updateEpochs() // trigger epoch stat recalculation todo: optimize this } -func (s *Storage) OnTransactionReceipt(in *pb.TransactionReceipt) { - log.Info("OnTransactionReceipt(%+v)", in) - err := s.UpdateTransaction(context.Background(), model.NewTransactionReceipt(in)) +func (s *Storage) OnTransactionResult(res *pb.TransactionResult, state *pb.TransactionState) { + log.Info("OnTransactionReceipt(%+v, %+v)", res, state) + tx, err := model.NewTransactionResult(res, state, s.NetworkInfo) + if err != nil { + log.Err(fmt.Errorf("OnTransactionResult: error %v", err)) + } + + err = s.SaveTransactionResult(context.Background(), tx) //TODO: better error handling if err != nil { - log.Err(fmt.Errorf("OnTransactionReceipt: error %v", err)) + log.Err(fmt.Errorf("OnTransactionResult: error %v", err)) } } diff --git a/storage/tx.go b/storage/tx.go index 9bd825b..ef479fb 100644 --- a/storage/tx.go +++ b/storage/tx.go @@ -39,10 +39,10 @@ func (s *Storage) GetTransaction(parent context.Context, query *bson.D) (*model. } if !cursor.Next(ctx) { log.Info("GetTransaction: Empty result") - return nil, errors.New("Empty result") + return nil, errors.New("empty result") } doc := cursor.Current - account := &model.Transaction{ + tx := &model.Transaction{ Id: utils.GetAsString(doc.Lookup("id")), Layer: utils.GetAsUInt32(doc.Lookup("layer")), Block: utils.GetAsString(doc.Lookup("block")), @@ -63,7 +63,7 @@ func (s *Storage) GetTransaction(parent context.Context, query *bson.D) (*model. Receiver: utils.GetAsString(doc.Lookup("receiver")), SvmData: utils.GetAsString(doc.Lookup("svmData")), } - return account, nil + return tx, nil } func (s *Storage) GetTransactionsCount(parent context.Context, query *bson.D, opts ...*options.CountOptions) int64 { @@ -141,6 +141,12 @@ func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts .. func (s *Storage) SaveTransaction(parent context.Context, in *model.Transaction) error { ctx, cancel := context.WithTimeout(parent, 5*time.Second) defer cancel() + + transaction, err := s.GetTransaction(ctx, &bson.D{{Key: "id", Value: in.Id}}) + if err != nil && err.Error() != "empty result" { + return err + } + tx := bson.D{ { Key: "$set", @@ -164,10 +170,40 @@ func (s *Storage) SaveTransaction(parent context.Context, in *model.Transaction) {Key: "sender", Value: in.Sender}, {Key: "receiver", Value: in.Receiver}, {Key: "svmData", Value: in.SvmData}, + {Key: "message", Value: in.Message}, + {Key: "touchedAddresses", Value: in.TouchedAddresses}, }, }, } - _, err := s.db.Collection("txs").UpdateOne(ctx, + + if transaction != nil { + tx = bson.D{ + { + Key: "$set", + Value: bson.D{ + {Key: "id", Value: in.Id}, + {Key: "layer", Value: in.Layer}, + {Key: "block", Value: in.Block}, + {Key: "blockIndex", Value: in.BlockIndex}, + {Key: "index", Value: in.Index}, + {Key: "timestamp", Value: in.Timestamp}, + {Key: "maxGas", Value: in.MaxGas}, + {Key: "gasPrice", Value: in.GasPrice}, + {Key: "fee", Value: in.Fee}, + {Key: "amount", Value: in.Amount}, + {Key: "counter", Value: in.Counter}, + {Key: "type", Value: in.Type}, + {Key: "signature", Value: in.Signature}, + {Key: "pubKey", Value: in.PublicKey}, + {Key: "sender", Value: in.Sender}, + {Key: "receiver", Value: in.Receiver}, + {Key: "svmData", Value: in.SvmData}, + }, + }, + } + } + + _, err = s.db.Collection("txs").UpdateOne(ctx, bson.D{{Key: "id", Value: in.Id}}, tx, options.Update().SetUpsert(true)) if err != nil { log.Info("SaveTransaction: %v obj: %+v", err, tx) @@ -208,20 +244,64 @@ func (s *Storage) SaveTransactions(parent context.Context, in map[string]*model. return nil } -func (s *Storage) UpdateTransaction(parent context.Context, in *model.TransactionReceipt) error { +func (s *Storage) SaveTransactionResult(parent context.Context, in *model.Transaction) error { ctx, cancel := context.WithTimeout(parent, 5*time.Second) defer cancel() - _, err := s.db.Collection("txs").UpdateOne(ctx, bson.D{{Key: "id", Value: in.Id}}, bson.D{ - {Key: "$set", Value: bson.D{ - {Key: "index", Value: in.Index}, - {Key: "state", Value: model.GetTransactionStateFromResult(in.Result)}, - {Key: "gasUsed", Value: in.GasUsed}, - {Key: "fee", Value: in.Fee}, - {Key: "svmData", Value: in.SvmData}, - }}, - }) + + transaction, err := s.GetTransaction(ctx, &bson.D{{Key: "id", Value: in.Id}}) + if err != nil && err.Error() != "empty result" { + return err + } + + tx := bson.D{ + { + Key: "$set", + Value: bson.D{ + {Key: "id", Value: in.Id}, + {Key: "layer", Value: in.Layer}, + {Key: "block", Value: in.Block}, + {Key: "blockIndex", Value: in.BlockIndex}, + {Key: "index", Value: in.Index}, + {Key: "state", Value: in.State}, + {Key: "timestamp", Value: in.Timestamp}, + {Key: "maxGas", Value: in.MaxGas}, + {Key: "gasPrice", Value: in.GasPrice}, + {Key: "gasUsed", Value: in.GasUsed}, + {Key: "fee", Value: in.Fee}, + {Key: "amount", Value: in.Amount}, + {Key: "counter", Value: in.Counter}, + {Key: "type", Value: in.Type}, + {Key: "signature", Value: in.Signature}, + {Key: "pubKey", Value: in.PublicKey}, + {Key: "sender", Value: in.Sender}, + {Key: "receiver", Value: in.Receiver}, + {Key: "svmData", Value: in.SvmData}, + {Key: "message", Value: in.Message}, + {Key: "touchedAddresses", Value: in.TouchedAddresses}, + }, + }, + } + + if transaction != nil { + tx = bson.D{ + { + Key: "$set", + Value: bson.D{ + {Key: "id", Value: in.Id}, + {Key: "state", Value: in.State}, + {Key: "gasUsed", Value: in.GasUsed}, + {Key: "fee", Value: in.Fee}, + {Key: "message", Value: in.Message}, + {Key: "touchedAddresses", Value: in.TouchedAddresses}, + }, + }, + } + } + + _, err = s.db.Collection("txs").UpdateOne(ctx, + bson.D{{Key: "id", Value: in.Id}}, tx, options.Update().SetUpsert(true)) if err != nil { - log.Info("UpdateTransaction: %v", err) + log.Info("SaveTransactionResult: %v obj: %+v", err, tx) } return err } diff --git a/test/testseed/generator.go b/test/testseed/generator.go index d50d5c3..1ace161 100644 --- a/test/testseed/generator.go +++ b/test/testseed/generator.go @@ -335,7 +335,7 @@ func generateTransaction(index int, layer *model.Layer, senderSigner *signing.Ed Block: block.Id, BlockIndex: uint32(index), Index: 0, - State: int(pb.TransactionState_TRANSACTION_STATE_PROCESSED), + State: int(pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED), Timestamp: layer.Start, MaxGas: maxGas, GasPrice: gasPrice,