From fa1cfa16c86818140a34483b6f6c89d06e89e7f9 Mon Sep 17 00:00:00 2001 From: Olga Telezhnaya Date: Tue, 24 Oct 2023 09:49:06 +0100 Subject: [PATCH] jsonrpc: add finality to tx requests (`tx`, `EXPERIMENTAL_tx_status`), add method `broadcast_tx` (#9644) Fixes https://github.com/near/nearcore/issues/6837 I'll need to update the [docs](https://github.com/near/docs) and [jsonrpc_client](https://github.com/near/near-jsonrpc-client-rs) (and maybe something else, I haven't already checked) Important updates: 1. We have a new concept for all tx-related methods - `finality`. `finality` in the response is merged to master 2 weeks ago: https://github.com/near/nearcore/pull/9556. `finality` field in the request means "I want at least this level of confidence". So, the stricter the level, the longer the user needs to wait. 2. I decided to use `Final` as a default `finality` value because it gives the strongest guarantee and does not change backward compatibility for any of the methods (though, the waiting time may be increased sometimes to achieve the strictest level of confidence). 3. Methods `tx`, `EXPERIMENTAL_tx_status` have `finality` as an additional optional field. 4. A new method appeared - `broadcast_tx`. It allows to send the tx, it also have the optional field `finality`. 6. `broadcast_tx_async` is equal to `broadcast_tx` with hardcoded `finality None`, I'll mark it as deprecated in the doc 7. `broadcast_tx_commit` is equal to `broadcast_tx` with hardcoded `finality Final`, I'll mark it as deprecated in the doc. --- chain/client/src/view_client.rs | 62 ++++---- .../src/types/transactions.rs | 66 ++++++-- chain/jsonrpc/CHANGELOG.md | 10 +- chain/jsonrpc/client/src/lib.rs | 11 +- chain/jsonrpc/jsonrpc-tests/src/lib.rs | 2 +- .../jsonrpc/jsonrpc-tests/tests/rpc_query.rs | 6 +- .../jsonrpc-tests/tests/rpc_transactions.rs | 45 ++++-- chain/jsonrpc/src/api/mod.rs | 8 - chain/jsonrpc/src/api/transactions.rs | 115 ++++++++++++-- chain/jsonrpc/src/lib.rs | 146 ++++++++---------- core/primitives/src/transaction.rs | 33 +++- core/primitives/src/views.rs | 9 +- .../src/tests/nearcore/rpc_nodes.rs | 11 +- integration-tests/src/user/rpc_user.rs | 14 +- 14 files changed, 350 insertions(+), 188 deletions(-) diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index d2d52d01aa0..adfa22b7dd2 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -417,41 +417,41 @@ impl ViewClientActor { } } + // Return the lowest status the node can proof fn get_tx_execution_status( &self, execution_outcome: &FinalExecutionOutcomeView, ) -> Result { - Ok(match execution_outcome.transaction_outcome.outcome.status { - // Return the lowest status the node can proof. - ExecutionStatusView::Unknown => TxExecutionStatus::None, - _ => { - if execution_outcome - .receipts_outcome - .iter() - .all(|e| e.outcome.status != ExecutionStatusView::Unknown) - { - let block_hashes: BTreeSet = - execution_outcome.receipts_outcome.iter().map(|e| e.block_hash).collect(); - let mut headers = vec![]; - for block_hash in block_hashes { - headers.push(self.chain.get_block_header(&block_hash)?); - } - // We can't sort and check only the last block; - // previous blocks may be not in the canonical chain - match self.chain.check_blocks_final_and_canonical(&headers) { - Ok(_) => TxExecutionStatus::Final, - Err(_) => TxExecutionStatus::Executed, - } - } else { - match self.chain.check_blocks_final_and_canonical(&[self - .chain - .get_block_header(&execution_outcome.transaction_outcome.block_hash)?]) - { - Ok(_) => TxExecutionStatus::InclusionFinal, - Err(_) => TxExecutionStatus::Inclusion, - } - } - } + if execution_outcome.transaction_outcome.outcome.status == ExecutionStatusView::Unknown { + return Ok(TxExecutionStatus::None); + } + + if let Err(_) = self.chain.check_blocks_final_and_canonical(&[self + .chain + .get_block_header(&execution_outcome.transaction_outcome.block_hash)?]) + { + return Ok(TxExecutionStatus::Included); + } + + if execution_outcome + .receipts_outcome + .iter() + .any(|e| e.outcome.status == ExecutionStatusView::Unknown) + { + return Ok(TxExecutionStatus::IncludedFinal); + } + + let block_hashes: BTreeSet = + execution_outcome.receipts_outcome.iter().map(|e| e.block_hash).collect(); + let mut headers = vec![]; + for block_hash in block_hashes { + headers.push(self.chain.get_block_header(&block_hash)?); + } + // We can't sort and check only the last block; + // previous blocks may be not in the canonical chain + Ok(match self.chain.check_blocks_final_and_canonical(&headers) { + Err(_) => TxExecutionStatus::Executed, + Ok(_) => TxExecutionStatus::Final, }) } diff --git a/chain/jsonrpc-primitives/src/types/transactions.rs b/chain/jsonrpc-primitives/src/types/transactions.rs index b83c43fce44..ac807cc39f4 100644 --- a/chain/jsonrpc-primitives/src/types/transactions.rs +++ b/chain/jsonrpc-primitives/src/types/transactions.rs @@ -2,26 +2,33 @@ use near_primitives::hash::CryptoHash; use near_primitives::types::AccountId; use serde_json::Value; -#[derive(Debug, Clone)] -pub struct RpcBroadcastTransactionRequest { +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RpcSendTransactionRequest { + #[serde(rename = "signed_tx_base64")] pub signed_transaction: near_primitives::transaction::SignedTransaction, + #[serde(default)] + pub wait_until: near_primitives::views::TxExecutionStatus, } #[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct RpcTransactionStatusCommonRequest { +pub struct RpcTransactionStatusRequest { #[serde(flatten)] pub transaction_info: TransactionInfo, + #[serde(default)] + pub wait_until: near_primitives::views::TxExecutionStatus, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[serde(untagged)] pub enum TransactionInfo { - #[serde(skip_deserializing)] - Transaction(near_primitives::transaction::SignedTransaction), - TransactionId { - tx_hash: CryptoHash, - sender_account_id: AccountId, - }, + Transaction(SignedTransaction), + TransactionId { tx_hash: CryptoHash, sender_account_id: AccountId }, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub enum SignedTransaction { + #[serde(rename = "signed_tx_base64")] + SignedTransaction(near_primitives::transaction::SignedTransaction), } #[derive(thiserror::Error, Debug, serde::Serialize, serde::Deserialize)] @@ -56,21 +63,46 @@ pub struct RpcBroadcastTxSyncResponse { pub transaction_hash: near_primitives::hash::CryptoHash, } -impl From for RpcTransactionStatusCommonRequest { - fn from(transaction_info: TransactionInfo) -> Self { - Self { transaction_info } +impl TransactionInfo { + pub fn from_signed_tx(tx: near_primitives::transaction::SignedTransaction) -> Self { + Self::Transaction(SignedTransaction::SignedTransaction(tx)) } -} -impl From for RpcTransactionStatusCommonRequest { - fn from(transaction_info: near_primitives::transaction::SignedTransaction) -> Self { - Self { transaction_info: transaction_info.into() } + pub fn to_signed_tx(&self) -> Option<&near_primitives::transaction::SignedTransaction> { + match self { + TransactionInfo::Transaction(tx) => match tx { + SignedTransaction::SignedTransaction(tx) => Some(tx), + }, + TransactionInfo::TransactionId { .. } => None, + } + } + + pub fn to_tx_hash_and_account(&self) -> (CryptoHash, &AccountId) { + match self { + TransactionInfo::Transaction(tx) => match tx { + SignedTransaction::SignedTransaction(tx) => { + (tx.get_hash(), &tx.transaction.signer_id) + } + }, + TransactionInfo::TransactionId { tx_hash, sender_account_id } => { + (*tx_hash, sender_account_id) + } + } } } impl From for TransactionInfo { fn from(transaction_info: near_primitives::transaction::SignedTransaction) -> Self { - Self::Transaction(transaction_info) + Self::Transaction(SignedTransaction::SignedTransaction(transaction_info)) + } +} + +impl From for RpcTransactionResponse { + fn from(view: near_primitives::views::TxStatusView) -> Self { + Self { + final_execution_outcome: view.execution_outcome, + final_execution_status: view.status, + } } } diff --git a/chain/jsonrpc/CHANGELOG.md b/chain/jsonrpc/CHANGELOG.md index 5240e7686f8..bef79315eb7 100644 --- a/chain/jsonrpc/CHANGELOG.md +++ b/chain/jsonrpc/CHANGELOG.md @@ -2,14 +2,16 @@ ## 0.2.3 -* Added `final_execution_status` field to the response of methods `tx`, `EXPERIMENTAL_tx_status`, `broadcast_tx_commit` +* Added `send_tx` method which gives configurable execution guarantees options and potentially replaces existing `broadcast_tx_async`, `broadcast_tx_commit` +* Field `final_execution_status` is presented in the response of methods `tx`, `EXPERIMENTAL_tx_status`, `broadcast_tx_commit`, `send_tx` +* Allowed use json in request for methods `EXPERIMENTAL_tx_status`, `tx`, `broadcast_tx_commit`, `broadcast_tx_async`, `send_tx` +* Parameter `wait_until` (same entity as `final_execution_status` in the response) is presented as optional request parameter for methods `EXPERIMENTAL_tx_status`, `tx`, `send_tx`. The response will be returned only when the desired level of finality is reached ### Breaking changes -* Response from `broadcast_tx_async` changed the type from String to Dict. - The response has the same structure as in `broadcast_tx_commit`. - It no longer contains transaction hash (which may be retrieved from Signed Transaction passed in request). * Removed `EXPERIMENTAL_check_tx` method. Use `tx` method instead +* `EXPERIMENTAL_tx_status`, `tx` methods now wait for recently sent tx (~3-6 seconds) and then show it. Previously, `UnknownTransaction` was immediately returned +* `EXPERIMENTAL_tx_status`, `tx` methods wait 10 seconds and then return `TimeoutError` for never existed transactions. Previously, `UnknownTransaction` was immediately returned ## 0.2.2 diff --git a/chain/jsonrpc/client/src/lib.rs b/chain/jsonrpc/client/src/lib.rs index 7b62d61f2ba..d8556354950 100644 --- a/chain/jsonrpc/client/src/lib.rs +++ b/chain/jsonrpc/client/src/lib.rs @@ -5,10 +5,12 @@ use near_jsonrpc_primitives::message::{from_slice, Message}; use near_jsonrpc_primitives::types::changes::{ RpcStateChangesInBlockByTypeRequest, RpcStateChangesInBlockByTypeResponse, }; -use near_jsonrpc_primitives::types::transactions::RpcTransactionResponse; +use near_jsonrpc_primitives::types::transactions::{ + RpcTransactionResponse, RpcTransactionStatusRequest, +}; use near_jsonrpc_primitives::types::validator::RpcValidatorsOrderedRequest; use near_primitives::hash::CryptoHash; -use near_primitives::types::{AccountId, BlockId, BlockReference, MaybeBlockId, ShardId}; +use near_primitives::types::{BlockId, BlockReference, MaybeBlockId, ShardId}; use near_primitives::views::validator_stake_view::ValidatorStakeView; use near_primitives::views::{ BlockView, ChunkView, EpochValidatorInfo, GasPriceView, StatusResponse, @@ -186,7 +188,6 @@ jsonrpc_client!(pub struct JsonRpcClient { #[allow(non_snake_case)] pub fn EXPERIMENTAL_tx_status(&self, tx: String) -> RpcRequest; pub fn health(&self) -> RpcRequest<()>; - pub fn tx(&self, hash: String, account_id: AccountId) -> RpcRequest; pub fn chunk(&self, id: ChunkId) -> RpcRequest; pub fn validators(&self, block_id: MaybeBlockId) -> RpcRequest; pub fn gas_price(&self, block_id: MaybeBlockId) -> RpcRequest; @@ -218,6 +219,10 @@ impl JsonRpcClient { call_method(&self.client, &self.server_addr, "block", request) } + pub fn tx(&self, request: RpcTransactionStatusRequest) -> RpcRequest { + call_method(&self.client, &self.server_addr, "tx", request) + } + #[allow(non_snake_case)] pub fn EXPERIMENTAL_changes( &self, diff --git a/chain/jsonrpc/jsonrpc-tests/src/lib.rs b/chain/jsonrpc/jsonrpc-tests/src/lib.rs index 40ced21b0d3..4ce459c6b33 100644 --- a/chain/jsonrpc/jsonrpc-tests/src/lib.rs +++ b/chain/jsonrpc/jsonrpc-tests/src/lib.rs @@ -33,7 +33,7 @@ pub fn start_all_with_validity_period_and_no_epoch_sync( enable_doomslug: bool, ) -> (Addr, tcp::ListenerAddr) { let actor_handles = setup_no_network_with_validity_period_and_no_epoch_sync( - vec!["test1".parse().unwrap(), "test2".parse().unwrap()], + vec!["test1".parse().unwrap()], if let NodeType::Validator = node_type { "test1".parse().unwrap() } else { diff --git a/chain/jsonrpc/jsonrpc-tests/tests/rpc_query.rs b/chain/jsonrpc/jsonrpc-tests/tests/rpc_query.rs index 28568d03f29..89fe598db60 100644 --- a/chain/jsonrpc/jsonrpc-tests/tests/rpc_query.rs +++ b/chain/jsonrpc/jsonrpc-tests/tests/rpc_query.rs @@ -89,7 +89,7 @@ fn test_block_query() { fn test_chunk_by_hash() { test_with_client!(test_utils::NodeType::NonValidator, client, async move { let chunk = client.chunk(ChunkId::BlockShardId(BlockId::Height(0), 0u64)).await.unwrap(); - assert_eq!(chunk.author, "test2".parse().unwrap()); + assert_eq!(chunk.author, "test1".parse().unwrap()); assert_eq!(chunk.header.balance_burnt, 0); assert_eq!(chunk.header.chunk_hash.as_ref().len(), 32); assert_eq!(chunk.header.encoded_length, 8); @@ -454,7 +454,7 @@ fn test_validators_ordered() { .unwrap(); assert_eq!( validators.into_iter().map(|v| v.take_account_id()).collect::>(), - vec!["test1".parse().unwrap(), "test2".parse().unwrap()] + vec!["test1".parse().unwrap()] ) }); } @@ -560,7 +560,7 @@ fn test_get_chunk_with_object_in_params() { ) .await .unwrap(); - assert_eq!(chunk.author, "test2".parse().unwrap()); + assert_eq!(chunk.author, "test1".parse().unwrap()); assert_eq!(chunk.header.balance_burnt, 0); assert_eq!(chunk.header.chunk_hash.as_ref().len(), 32); assert_eq!(chunk.header.encoded_length, 8); diff --git a/chain/jsonrpc/jsonrpc-tests/tests/rpc_transactions.rs b/chain/jsonrpc/jsonrpc-tests/tests/rpc_transactions.rs index e396189a3fd..54e286197a6 100644 --- a/chain/jsonrpc/jsonrpc-tests/tests/rpc_transactions.rs +++ b/chain/jsonrpc/jsonrpc-tests/tests/rpc_transactions.rs @@ -7,6 +7,7 @@ use futures::{future, FutureExt, TryFutureExt}; use near_actix_test_utils::run_actix; use near_crypto::{InMemorySigner, KeyType}; use near_jsonrpc::client::new_client; +use near_jsonrpc_primitives::types::transactions::{RpcTransactionStatusRequest, TransactionInfo}; use near_network::test_utils::WaitOrTimeoutActor; use near_o11y::testonly::{init_integration_logger, init_test_logger}; use near_primitives::hash::{hash, CryptoHash}; @@ -59,7 +60,13 @@ fn test_send_tx_async() { if let Some(tx_hash) = *tx_hash2_2.lock().unwrap() { actix::spawn( client1 - .tx(tx_hash.to_string(), signer_account_id) + .tx(RpcTransactionStatusRequest { + transaction_info: TransactionInfo::TransactionId { + tx_hash, + sender_account_id: signer_account_id, + }, + wait_until: TxExecutionStatus::Executed, + }) .map_err(|err| println!("Error: {:?}", err)) .map_ok(|result| { if let FinalExecutionStatus::SuccessValue(_) = @@ -200,7 +207,14 @@ fn test_replay_protection() { #[test] fn test_tx_status_missing_tx() { test_with_client!(test_utils::NodeType::Validator, client, async move { - match client.tx(CryptoHash::new().to_string(), "test1".parse().unwrap()).await { + let request = RpcTransactionStatusRequest { + transaction_info: TransactionInfo::TransactionId { + tx_hash: CryptoHash::new(), + sender_account_id: "test1".parse().unwrap(), + }, + wait_until: TxExecutionStatus::None, + }; + match client.tx(request).await { Err(e) => { let s = serde_json::to_string(&e.data.unwrap()).unwrap(); assert_eq!(s, "\"Transaction 11111111111111111111111111111111 doesn't exist\""); @@ -215,16 +229,23 @@ fn test_check_invalid_tx() { test_with_client!(test_utils::NodeType::Validator, client, async move { let signer = InMemorySigner::from_seed("test1".parse().unwrap(), KeyType::ED25519, "test1"); // invalid base hash - let tx = SignedTransaction::send_money( - 1, - "test1".parse().unwrap(), - "test2".parse().unwrap(), - &signer, - 100, - hash(&[1]), - ); - if let Ok(_) = client.tx(tx.get_hash().to_string(), "test1".parse().unwrap()).await { - panic!("transaction should not succeed"); + let request = RpcTransactionStatusRequest { + transaction_info: TransactionInfo::from_signed_tx(SignedTransaction::send_money( + 1, + "test1".parse().unwrap(), + "test2".parse().unwrap(), + &signer, + 100, + hash(&[1]), + )), + wait_until: TxExecutionStatus::None, + }; + match client.tx(request).await { + Err(e) => { + let s = serde_json::to_string(&e.data.unwrap()).unwrap(); + assert_eq!(s, "{\"TxExecutionError\":{\"InvalidTxError\":\"Expired\"}}"); + } + Ok(_) => panic!("transaction should not succeed"), } }); } diff --git a/chain/jsonrpc/src/api/mod.rs b/chain/jsonrpc/src/api/mod.rs index 6a60c8fa290..6c529b255c4 100644 --- a/chain/jsonrpc/src/api/mod.rs +++ b/chain/jsonrpc/src/api/mod.rs @@ -128,14 +128,6 @@ mod params { self.0.unwrap_or_else(Self::parse) } - /// Finish chain of parsing without trying of parsing to `T` directly - pub fn unwrap(self) -> Result { - match self.0 { - Ok(res) => res, - Err(e) => Err(RpcParseError(format!("Failed parsing args: {e}"))), - } - } - /// If value hasn’t been parsed yet and it’s a one-element array /// (i.e. singleton) deserialises the element and calls `func` on it. /// diff --git a/chain/jsonrpc/src/api/transactions.rs b/chain/jsonrpc/src/api/transactions.rs index d42e0e1b64e..32fb148ab10 100644 --- a/chain/jsonrpc/src/api/transactions.rs +++ b/chain/jsonrpc/src/api/transactions.rs @@ -3,28 +3,50 @@ use serde_json::Value; use near_client_primitives::types::TxStatusError; use near_jsonrpc_primitives::errors::RpcParseError; use near_jsonrpc_primitives::types::transactions::{ - RpcBroadcastTransactionRequest, RpcTransactionError, RpcTransactionStatusCommonRequest, - TransactionInfo, + RpcSendTransactionRequest, RpcTransactionError, RpcTransactionStatusRequest, TransactionInfo, }; use near_primitives::borsh::BorshDeserialize; use near_primitives::transaction::SignedTransaction; use super::{Params, RpcFrom, RpcRequest}; -impl RpcRequest for RpcBroadcastTransactionRequest { +impl RpcRequest for RpcSendTransactionRequest { fn parse(value: Value) -> Result { - let signed_transaction = - Params::new(value).try_singleton(|value| decode_signed_transaction(value)).unwrap()?; - Ok(Self { signed_transaction }) + let tx_request = Params::new(value) + .try_singleton(|value| { + Ok(RpcSendTransactionRequest { + signed_transaction: decode_signed_transaction(value)?, + // will be ignored in `broadcast_tx_async`, `broadcast_tx_commit` + wait_until: Default::default(), + }) + }) + .try_pair(|_: String, _: String| { + // Here, we restrict serde parsing object from the array + // `wait_until` is a new feature supported only in object + Err(RpcParseError( + "Unable to parse send request: too many params passed".to_string(), + )) + }) + .unwrap_or_parse()?; + Ok(tx_request) } } -impl RpcRequest for RpcTransactionStatusCommonRequest { +impl RpcRequest for RpcTransactionStatusRequest { fn parse(value: Value) -> Result { Ok(Params::new(value) - .try_singleton(|signed_tx| decode_signed_transaction(signed_tx).map(|x| x.into())) + .try_singleton(|signed_tx| { + Ok(RpcTransactionStatusRequest { + transaction_info: decode_signed_transaction(signed_tx)?.into(), + wait_until: Default::default(), + }) + }) .try_pair(|tx_hash, sender_account_id| { - Ok(TransactionInfo::TransactionId { tx_hash, sender_account_id }.into()) + Ok(RpcTransactionStatusRequest { + transaction_info: TransactionInfo::TransactionId { tx_hash, sender_account_id } + .into(), + wait_until: Default::default(), + }) }) .unwrap_or_parse()?) } @@ -62,7 +84,7 @@ fn decode_signed_transaction(value: String) -> Result process_method_call(request, |params| self.block(params)).await, "broadcast_tx_async" => { - process_method_call(request, |params| self.send_tx_async(params)).await + process_method_call(request, |params| async { + let tx = self.send_tx_async(params).await.to_string(); + Result::<_, std::convert::Infallible>::Ok(tx) + }) + .await } "broadcast_tx_commit" => { process_method_call(request, |params| self.send_tx_commit(params)).await @@ -331,6 +337,7 @@ impl JsonRpcHandler { process_method_call(request, |params| self.next_light_client_block(params)).await } "network_info" => process_method_call(request, |_params: ()| self.network_info()).await, + "send_tx" => process_method_call(request, |params| self.send_tx(params)).await, "status" => process_method_call(request, |_params: ()| self.status()).await, "tx" => { process_method_call(request, |params| self.tx_status_common(params, false)).await @@ -466,12 +473,10 @@ impl JsonRpcHandler { async fn send_tx_async( &self, - request_data: near_jsonrpc_primitives::types::transactions::RpcBroadcastTransactionRequest, - ) -> Result< - near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, - near_jsonrpc_primitives::types::transactions::RpcTransactionError, - > { + request_data: near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest, + ) -> CryptoHash { let tx = request_data.signed_transaction; + let hash = tx.get_hash(); self.client_addr.do_send( ProcessTxRequest { transaction: tx, @@ -480,10 +485,7 @@ impl JsonRpcHandler { } .with_span_context(), ); - Ok(RpcTransactionResponse { - final_execution_outcome: None, - final_execution_status: TxExecutionStatus::None, - }) + hash } async fn tx_exists( @@ -530,23 +532,19 @@ impl JsonRpcHandler { })? } + /// Return status of the given transaction + /// + /// `finality` forces the execution to wait until the desired finality level is reached async fn tx_status_fetch( &self, tx_info: near_jsonrpc_primitives::types::transactions::TransactionInfo, + finality: near_primitives::views::TxExecutionStatus, fetch_receipt: bool, ) -> Result< near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, near_jsonrpc_primitives::types::transactions::RpcTransactionError, > { - let (tx_hash, account_id) = match &tx_info { - near_jsonrpc_primitives::types::transactions::TransactionInfo::Transaction(tx) => { - (tx.get_hash(), tx.transaction.signer_id.clone()) - } - near_jsonrpc_primitives::types::transactions::TransactionInfo::TransactionId { - tx_hash, - sender_account_id, - } => (*tx_hash, sender_account_id.clone()), - }; + let (tx_hash, account_id) = tx_info.to_tx_hash_and_account(); timeout(self.polling_config.polling_timeout, async { loop { let tx_status_result = self.view_client_send( TxStatus { @@ -557,20 +555,17 @@ impl JsonRpcHandler { .await; match tx_status_result { Ok(result) => { - if let Some(outcome) = result.execution_outcome { - break Ok(RpcTransactionResponse { - final_execution_outcome: Some(outcome), - final_execution_status: result.status, - }) + if result.status >= finality { + break Ok(result.into()) } // else: No such transaction recorded on chain yet }, Err(err @ near_jsonrpc_primitives::types::transactions::RpcTransactionError::UnknownTransaction { .. }) => { - if let near_jsonrpc_primitives::types::transactions::TransactionInfo::Transaction(tx) = &tx_info { + if let Some(tx) = tx_info.to_signed_tx() { if let Ok(ProcessTxResponse::InvalidTx(context)) = - self.send_tx(tx.clone(), true).await + self.send_tx_internal(tx.clone(), true).await { break Err( near_jsonrpc_primitives::types::transactions::RpcTransactionError::InvalidTransaction { @@ -579,7 +574,9 @@ impl JsonRpcHandler { ); } } - break Err(err); + if finality == TxExecutionStatus::None { + break Err(err); + } } Err(err) => break Err(err), } @@ -598,46 +595,10 @@ impl JsonRpcHandler { })? } - async fn tx_polling( - &self, - tx_info: near_jsonrpc_primitives::types::transactions::TransactionInfo, - ) -> Result< - near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, - near_jsonrpc_primitives::types::transactions::RpcTransactionError, - > { - timeout(self.polling_config.polling_timeout, async { - loop { - match self.tx_status_fetch(tx_info.clone(), false).await { - Ok(tx_status) => { - break Ok(tx_status) - } - // If transaction is missing, keep polling. - Err(near_jsonrpc_primitives::types::transactions::RpcTransactionError::UnknownTransaction { - .. - }) => {} - // If we hit any other error, we return to the user. - Err(err) => { - break Err(err.rpc_into()); - } - } - sleep(self.polling_config.polling_interval).await; - } - }) - .await - .map_err(|_| { - metrics::RPC_TIMEOUT_TOTAL.inc(); - tracing::warn!( - target: "jsonrpc", "Timeout: tx_polling method. tx_info {:?}", - tx_info, - ); - near_jsonrpc_primitives::types::transactions::RpcTransactionError::TimeoutError - })? - } - /// Send a transaction idempotently (subsequent send of the same transaction will not cause /// any new side-effects and the result will be the same unless we garbage collected it /// already). - async fn send_tx( + async fn send_tx_internal( &self, tx: SignedTransaction, check_only: bool, @@ -671,12 +632,12 @@ impl JsonRpcHandler { async fn send_tx_sync( &self, - request_data: near_jsonrpc_primitives::types::transactions::RpcBroadcastTransactionRequest, + request_data: near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest, ) -> Result< near_jsonrpc_primitives::types::transactions::RpcBroadcastTxSyncResponse, near_jsonrpc_primitives::types::transactions::RpcTransactionError, > { - match self.send_tx(request_data.clone().signed_transaction, false).await? { + match self.send_tx_internal(request_data.clone().signed_transaction, false).await? { ProcessTxResponse::ValidTx => { Ok(near_jsonrpc_primitives::types::transactions::RpcBroadcastTxSyncResponse { transaction_hash: request_data.signed_transaction.get_hash(), @@ -695,29 +656,28 @@ impl JsonRpcHandler { } } - async fn send_tx_commit( + async fn send_tx( &self, - request_data: near_jsonrpc_primitives::types::transactions::RpcBroadcastTransactionRequest, + request_data: near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest, ) -> Result< near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, near_jsonrpc_primitives::types::transactions::RpcTransactionError, > { - let tx = request_data.signed_transaction; - match self.tx_status_fetch(tx.clone().into(), false).await - { - Ok(outcome) => { - return Ok(outcome); - } - Err(err @ near_jsonrpc_primitives::types::transactions::RpcTransactionError::InvalidTransaction { - .. - }) => { - return Err(err); - } - _ => {} + if request_data.wait_until == TxExecutionStatus::None { + self.send_tx_async(request_data).await; + return Ok(RpcTransactionResponse { + final_execution_outcome: None, + final_execution_status: TxExecutionStatus::None, + }); } - match self.send_tx(tx.clone(), false).await? { + let tx = request_data.signed_transaction; + match self.send_tx_internal(tx.clone(), false).await? { ProcessTxResponse::ValidTx | ProcessTxResponse::RequestRouted => { - self.tx_polling(tx.into()).await + self.tx_status_fetch( + near_jsonrpc_primitives::types::transactions::TransactionInfo::from_signed_tx(tx.clone()), + request_data.wait_until, + false, + ).await } network_client_response=> { Err( @@ -729,6 +689,20 @@ impl JsonRpcHandler { } } + async fn send_tx_commit( + &self, + request_data: near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest, + ) -> Result< + near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, + near_jsonrpc_primitives::types::transactions::RpcTransactionError, + > { + self.send_tx(RpcSendTransactionRequest { + signed_transaction: request_data.signed_transaction, + wait_until: TxExecutionStatus::Final, + }) + .await + } + async fn health( &self, ) -> Result< @@ -871,13 +845,15 @@ impl JsonRpcHandler { async fn tx_status_common( &self, - request_data: near_jsonrpc_primitives::types::transactions::RpcTransactionStatusCommonRequest, + request_data: near_jsonrpc_primitives::types::transactions::RpcTransactionStatusRequest, fetch_receipt: bool, ) -> Result< near_jsonrpc_primitives::types::transactions::RpcTransactionResponse, near_jsonrpc_primitives::types::transactions::RpcTransactionError, > { - let tx_status = self.tx_status_fetch(request_data.transaction_info, fetch_receipt).await?; + let tx_status = self + .tx_status_fetch(request_data.transaction_info, request_data.wait_until, fetch_receipt) + .await?; Ok(tx_status.rpc_into()) } diff --git a/core/primitives/src/transaction.rs b/core/primitives/src/transaction.rs index ff2984db4af..c49b42c48e6 100644 --- a/core/primitives/src/transaction.rs +++ b/core/primitives/src/transaction.rs @@ -5,8 +5,11 @@ use crate::types::{AccountId, Balance, Gas, Nonce}; use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::{PublicKey, Signature}; use near_fmt::{AbbrBytes, Slice}; +use near_primitives_core::serialize::{from_base64, to_base64}; use near_primitives_core::types::Compute; use near_vm_runner::{ProfileDataV2, ProfileDataV3}; +use serde::de::Error as DecodeError; +use serde::ser::Error as EncodeError; use std::borrow::Borrow; use std::fmt; use std::hash::{Hash, Hasher}; @@ -44,7 +47,7 @@ impl Transaction { } } -#[derive(BorshSerialize, BorshDeserialize, serde::Serialize, Eq, Debug, Clone)] +#[derive(BorshSerialize, BorshDeserialize, Eq, Debug, Clone)] #[borsh(init=init)] pub struct SignedTransaction { pub transaction: Transaction, @@ -96,6 +99,34 @@ impl Borrow for SignedTransaction { } } +impl serde::Serialize for SignedTransaction { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let signed_tx_borsh = borsh::to_vec(self).map_err(|err| { + S::Error::custom(&format!("the value could not be borsh encoded due to: {}", err)) + })?; + let signed_tx_base64 = to_base64(&signed_tx_borsh); + serializer.serialize_str(&signed_tx_base64) + } +} + +impl<'de> serde::Deserialize<'de> for SignedTransaction { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let signed_tx_base64 = ::deserialize(deserializer)?; + let signed_tx_borsh = from_base64(&signed_tx_base64).map_err(|err| { + D::Error::custom(&format!("the value could not decoded from base64 due to: {}", err)) + })?; + borsh::from_slice::(&signed_tx_borsh).map_err(|err| { + D::Error::custom(&format!("the value could not decoded from borsh due to: {}", err)) + }) + } +} + /// The status of execution for a transaction or a receipt. #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Default)] pub enum ExecutionStatus { diff --git a/core/primitives/src/views.rs b/core/primitives/src/views.rs index d42dde29516..54d8f8a65fb 100644 --- a/core/primitives/src/views.rs +++ b/core/primitives/src/views.rs @@ -1672,22 +1672,27 @@ pub struct TxStatusView { serde::Deserialize, Clone, Debug, + Default, Eq, PartialEq, + Ord, + PartialOrd, )] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum TxExecutionStatus { /// Transaction is waiting to be included into the block None, /// Transaction is included into the block. The block may be not finalised yet - Inclusion, + Included, /// Transaction is included into finalised block - InclusionFinal, + IncludedFinal, /// Transaction is included into finalised block + /// All the transaction receipts finished their execution. /// The corresponding blocks for each receipt may be not finalised yet Executed, /// Transaction is included into finalised block + /// Execution of transaction receipts is finalised + #[default] Final, } diff --git a/integration-tests/src/tests/nearcore/rpc_nodes.rs b/integration-tests/src/tests/nearcore/rpc_nodes.rs index 1e05e4fea4f..8824d291863 100644 --- a/integration-tests/src/tests/nearcore/rpc_nodes.rs +++ b/integration-tests/src/tests/nearcore/rpc_nodes.rs @@ -10,6 +10,7 @@ use near_actix_test_utils::spawn_interruptible; use near_client::{GetBlock, GetExecutionOutcome, GetValidatorInfo}; use near_crypto::{InMemorySigner, KeyType}; use near_jsonrpc::client::new_client; +use near_jsonrpc_primitives::types::transactions::{RpcTransactionStatusRequest, TransactionInfo}; use near_network::test_utils::WaitOrTimeoutActor; use near_o11y::testonly::init_integration_logger; use near_o11y::WithSpanContextExt; @@ -22,7 +23,9 @@ use near_primitives::types::{ BlockId, BlockReference, EpochId, EpochReference, Finality, TransactionOrReceiptId, }; use near_primitives::version::ProtocolVersion; -use near_primitives::views::{ExecutionOutcomeView, ExecutionStatusView, RuntimeConfigView}; +use near_primitives::views::{ + ExecutionOutcomeView, ExecutionStatusView, RuntimeConfigView, TxExecutionStatus, +}; use std::time::Duration; #[test] @@ -615,8 +618,12 @@ fn test_check_tx_on_lightclient_must_return_does_not_track_shard() { let res = view_client.send(GetBlock::latest().with_span_context()).await; if let Ok(Ok(block)) = res { if block.header.height > 10 { + let request = RpcTransactionStatusRequest { + transaction_info: TransactionInfo::from_signed_tx(transaction), + wait_until: TxExecutionStatus::None, + }; let _ = client - .tx(transaction.get_hash().to_string(), "near.1".parse().unwrap()) + .tx(request) .map_err(|err| { assert_eq!( err.data.unwrap(), diff --git a/integration-tests/src/user/rpc_user.rs b/integration-tests/src/user/rpc_user.rs index a4f07f3c481..2d3b2d69415 100644 --- a/integration-tests/src/user/rpc_user.rs +++ b/integration-tests/src/user/rpc_user.rs @@ -10,6 +10,7 @@ use near_jsonrpc::client::{new_client, JsonRpcClient}; use near_jsonrpc_client::ChunkId; use near_jsonrpc_primitives::errors::ServerError; use near_jsonrpc_primitives::types::query::{RpcQueryRequest, RpcQueryResponse}; +use near_jsonrpc_primitives::types::transactions::{RpcTransactionStatusRequest, TransactionInfo}; use near_primitives::hash::CryptoHash; use near_primitives::receipt::Receipt; use near_primitives::serialize::to_base64; @@ -20,7 +21,7 @@ use near_primitives::types::{ use near_primitives::views::{ AccessKeyView, AccountView, BlockView, CallResult, ChunkView, ContractCodeView, EpochValidatorInfo, ExecutionOutcomeView, FinalExecutionOutcomeView, QueryRequest, - ViewStateResult, + TxExecutionStatus, ViewStateResult, }; use crate::user::User; @@ -183,9 +184,14 @@ impl User for RpcUser { } fn get_transaction_final_result(&self, hash: &CryptoHash) -> FinalExecutionOutcomeView { - let account_id = self.account_id.clone(); - let hash = hash.to_string(); - self.actix(move |client| client.tx(hash, account_id)) + let request = RpcTransactionStatusRequest { + transaction_info: TransactionInfo::TransactionId { + tx_hash: *hash, + sender_account_id: self.account_id.clone(), + }, + wait_until: TxExecutionStatus::Final, + }; + self.actix(move |client| client.tx(request)) .unwrap() .final_execution_outcome .unwrap()