Skip to content

Commit

Permalink
Fix: repeated payment bug
Browse files Browse the repository at this point in the history
This patch updates to the latest version of Clarity and Web30 which
contain a fix specifically created for this situation.

By separating the prepare and send transaction steps we can more
carefully separate cases where we might have published a transaction
from those where there is no possibility.

Due to recent strange behavior on Gnosis chain, including reorgs and our
own rpc issues routers would repeatedly attempt to send the same payment
over and over again, these payments would then successfully enter the
chain but return an error to the router, which would simply continue to
retry the transaction. Resulting in a wallet draining bug.
  • Loading branch information
jkilpatr committed Oct 23, 2023
1 parent a790db7 commit 389d0a8
Show file tree
Hide file tree
Showing 22 changed files with 585 additions and 377 deletions.
606 changes: 383 additions & 223 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ inherits = "dev"
opt-level = 2

[workspace.dependencies]
deep_space = { version = "2", features = ["althea"], default-features = false }
deep_space = {version = "2", features = ["althea"], default-features=false}
web30 = "1.2"
clarity = "1.3"
awc = {version = "3.2", default-features = false, features=["openssl", "compress-gzip", "compress-zstd"]}
1 change: 0 additions & 1 deletion antenna_forwarding_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ fn process_messages(
ForwardingProtocolMessage::ConnectionCloseMessage { stream_id } => {
trace!("Got close message for stream {}", stream_id);
*last_message = Instant::now();
let stream_id = stream_id;
if let Some(stream) = streams.get(stream_id) {
let _res = stream.stream.shutdown(Shutdown::Both);
streams.remove(stream_id);
Expand Down
1 change: 1 addition & 0 deletions antenna_forwarding_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ serde_json = "1.0"
serde_derive = "1.0"
serde = "1.0"
sodiumoxide = "0.2"
clarity = {workspace = true}
log = "0.4"
lazy_static = "1.4"

Expand Down
6 changes: 4 additions & 2 deletions auto_bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ authors = ["Jehan <[email protected]>, Justin <[email protected]>"]
edition = "2018"

[dependencies]
web30 = "1.0"
web30 = {workspace = true}
num256 = "0.5"
clarity = "1.2"
clarity = {workspace = true}
rand = "0.8"
num = "0.4"
log = "0.4"
serde_derive = "1.0"
serde = "1.0"
Expand Down
15 changes: 9 additions & 6 deletions auto_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ impl TokenBridge {
// You basically just send it some dai to the bridge address and they show
// up in the same address on the xdai side we have no idea when this has succeeded
// since the events are not indexed
let tx_hash = self
let tx = self
.eth_web3
.send_transaction(
.prepare_transaction(
*DAI_CONTRACT_ON_ETH,
encode_call(
"transfer(address,uint256)",
Expand All @@ -133,6 +133,7 @@ impl TokenBridge {
Vec::new(),
)
.await?;
let tx_hash = self.eth_web3.send_prepared_transaction(tx).await?;

self.eth_web3
.wait_for_transaction(tx_hash, timeout, None)
Expand Down Expand Up @@ -178,16 +179,17 @@ impl TokenBridge {
bytes_to_hex_str(&payload),
);

let txid = self
let tx = self
.eth_web3
.send_transaction(
.prepare_transaction(
self.xdai_bridge_on_eth,
payload,
0u32.into(),
self.eth_privatekey,
Vec::new(),
)
.await?;
let txid = self.eth_web3.send_prepared_transaction(tx).await?;

let _ = self
.eth_web3
Expand Down Expand Up @@ -369,16 +371,17 @@ pub async fn encode_relaytokens(
let payload = encode_call("relayTokens(address)", &[dest_address.into()]).unwrap();
let options = Vec::new();

let tx_hash = bridge
let tx = bridge
.xdai_web3
.send_transaction(
.prepare_transaction(
bridge.xdai_bridge_on_xdai,
payload,
amount,
bridge.eth_privatekey,
options,
)
.await?;
let tx_hash = bridge.xdai_web3.send_prepared_transaction(tx).await?;

bridge
.xdai_web3
Expand Down
2 changes: 1 addition & 1 deletion clu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ log = "0.4"
ipgen = "1.0.1"
rand = "0.8"
serde_json = "1.0"
clarity = "1.2"
clarity = {workspace = true}
sodiumoxide = "0.2"
deep_space = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ rita_db_migration = { path = "../rita_db_migration" }
ctrlc = { version = "3.2.1", features = ["termination"] }
diesel = { version = "1.4", features = ["postgres", "r2d2"] }
diesel_migrations = { version = "1.4", features = ["postgres"] }
awc = "3.1"
awc = {workspace = true}
actix-rt = "2.8"
deep_space = { workspace = true }
clarity = "1.2"
deep_space = {workspace = true}
clarity = {workspace = true}
althea_proto = "0.3.0"
futures = { version = "0.3", features = ["compat"] }
num256 = "0.5"
num-traits = "0.2"
web30 = "1.0"
num-traits="0.2"
web30 = {workspace = true}
lazy_static = "1.4"
actix-web = { version = "4.3", default_features = false, features = [
"openssl",
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ pub async fn send_eth_bulk(amount: Uint256, destinations: &[clarity::Address], w
}
let mut txids = Vec::new();
for tx in transactions.iter() {
let txid = web3.eth_send_raw_transaction(tx.to_bytes()).await;
let txid = web3.send_prepared_transaction(tx.clone()).await;
info!("{:?}", txid);
txids.push(txid);
}
Expand Down
14 changes: 7 additions & 7 deletions rita_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ phonenumber = "0.3"
babel_monitor = { path = "../babel_monitor" }
arrayvec = { version = "0.7", features = ["serde"] }
sodiumoxide = "0.2"
web30 = "1.0"
awc = "3.1"
clu = { path = "../clu" }
web30 = {workspace = true}
awc = {workspace = true}
ipnetwork = "0.20"
actix-async = { package = "actix", version = "0.13" }
actix-web-async = { package = "actix-web", version = "4.3", default_features = false, features = [
"openssl",
] }
clarity = "1.2"
actix-async = {package="actix", version = "0.13"}
actix-web-async = { package="actix-web", version = "4.3", default_features = false, features= ["openssl"]}
actix-web-httpauth-async = { package="actix-web-httpauth", version = "0.8.0"}
clarity = {workspace = true}
openssh-keys = "0.6"
mac_address = "1.1.4"
futures = { version = "0.3", features = ["compat"] }
Expand Down
45 changes: 23 additions & 22 deletions rita_client/src/operator_fee_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ pub async fn tick_operator_payments() {
let full_node = get_web3_server();
let web3 = Web3::new(&full_node, TRANSACTION_SUBMISSION_TIMEOUT);

let transaction_status = web3
.send_transaction(
let tx = web3
.prepare_transaction(
operator_address,
Vec::new(),
amount_to_pay,
Expand All @@ -137,26 +137,27 @@ pub async fn tick_operator_payments() {
],
)
.await;

// in theory this may fail to get into a block, for now there is no handler and
// we will just underpay when that occurs. Failure to successfully submit the tx
// will be properly retried
match transaction_status {
Ok(txid) => {
info!(
"Successfully paid the operator {} wei with txid: {:#066x}!",
amount_to_pay, txid
);
update_payments(PaymentTx {
to: operator_identity,
from: our_id,
amount: amount_to_pay,
txid,
});
add_tx_to_total(amount_to_pay);
state.operator_debt -= amount_to_pay;
set_operator_fee_data(state);
}
match tx {
Ok(tx) => match web3.send_prepared_transaction(tx).await {
Ok(txid) => {
info!(
"Successfully paid the operator {} wei with txid: {:#066x}!",
amount_to_pay, txid
);
update_payments(PaymentTx {
to: operator_identity,
from: our_id,
amount: amount_to_pay,
txid,
});
add_tx_to_total(amount_to_pay);
state.operator_debt -= amount_to_pay;
set_operator_fee_data(state);
}
Err(e) => {
warn!("Failed to pay the operator! {:?}", e);
}
},
Err(e) => {
warn!("Failed to pay the operator! {:?}", e);
}
Expand Down
5 changes: 3 additions & 2 deletions rita_client_registration/src/client_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ pub async fn add_client_to_registered_list(
wait_timeout: Option<Duration>,
options: Vec<SendTxOption>,
) -> Result<Uint256, Web3Error> {
let tx_hash = web30
.send_transaction(
let tx = web30
.prepare_transaction(
contract,
encode_call(
"add_registered_user((string,string,address))",
Expand All @@ -119,6 +119,7 @@ pub async fn add_client_to_registered_list(
options,
)
.await?;
let tx_hash = web30.send_prepared_transaction(tx).await?;

if let Some(timeout) = wait_timeout {
future_timeout(timeout, web30.wait_for_transaction(tx_hash, timeout, None)).await??;
Expand Down
22 changes: 16 additions & 6 deletions rita_client_registration/src/register_client_batch_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn register_client_batch_loop(
let mut batch = vec![];
for id in reg_clients {
match contact
.send_transaction(
.prepare_transaction(
contract_addr,
match encode_call(
"add_registered_user((string,string,address))",
Expand All @@ -103,11 +103,21 @@ pub fn register_client_batch_loop(
)
.await
{
Ok(tx_id) => {
//increment nonce for next tx
nonce += 1u64.into();
remove_client_from_reg_batch(id);
batch.push(tx_id);
Ok(tx) => {
match contact.send_prepared_transaction(tx).await {
Ok(txid) => {
//increment nonce for next tx
nonce += 1u64.into();
remove_client_from_reg_batch(id);
batch.push(txid);
}
Err(e) => {
error!(
"Failed registration for {} with {}",
id.wg_public_key, e
);
}
}
}
Err(e) => {
error!(
Expand Down
24 changes: 8 additions & 16 deletions rita_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,23 @@ auto-bridge = { path = "../auto_bridge" }
serde_json = "1.0"
log = { version = "0.4", features = ["release_max_level_info"] }
settings = { path = "../settings" }
clarity = "1.2"
clarity = {workspace = true}
futures = { version = "0.3", features = ["compat"] }
num256 = "0.5"
num-traits = "0.2"
bincode = "1.3"
serde_cbor = "0.11"
lazy_static = "1.4"
althea_kernel_interface = { path = "../althea_kernel_interface" }
actix-web-httpauth-async = { package = "actix-web-httpauth", version = "0.8.0" }
actix-web-async = { package = "actix-web", version = "4.3", default_features = false, features = [
"openssl",
] }
awc = { version = "3.1", default-features = false, features = [
"openssl",
"compress-gzip",
"compress-zstd",
] }
actix-web-httpauth-async = { package="actix-web-httpauth", version = "0.8.0"}
actix-web-async = { package="actix-web", version = "4.3", default_features = false, features= ["openssl"]}
awc = {workspace = true}
actix-service = "2.0.2"
web30 = "1.0"
web30 = {workspace = true}
althea_types = { path = "../althea_types" }
deep_space = { workspace = true }
prost-types = "0.12"
cosmos-sdk-proto-althea = { package = "cosmos-sdk-proto-althea", version = "0.16", features = [
"ethermint",
] }
deep_space = {workspace = true}
prost-types ="0.12"
cosmos-sdk-proto-althea = {package = "cosmos-sdk-proto-althea", version = "0.16", features = ["ethermint"]}

[dependencies.regex]
version = "1.6"
Expand Down
20 changes: 14 additions & 6 deletions rita_common/src/dashboard/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ pub async fn eth_compatible_withdraw(dest: Address, amount: Uint256) -> HttpResp
let web3 = Web3::new(&full_node, WITHDRAW_TIMEOUT);
let payment_settings = settings::get_rita_common().payment;

let transaction_status = web3
.send_transaction(
let tx = web3
.prepare_transaction(
dest,
Vec::new(),
amount,
Expand All @@ -98,10 +98,18 @@ pub async fn eth_compatible_withdraw(dest: Address, amount: Uint256) -> HttpResp
],
)
.await;
if let Err(e) = transaction_status {
HttpResponse::InternalServerError().json(format!("Withdraw failed with {:?} try again!", e))
} else {
HttpResponse::Ok().json(format!("Successful withdraw of {} to {}", amount, dest))
match tx {
Ok(tx) => {
let transaction_status = web3.send_prepared_transaction(tx).await;
if let Err(e) = transaction_status {
HttpResponse::InternalServerError()
.json(format!("Withdraw failed with {:?} try again!", e))
} else {
HttpResponse::Ok().json(format!("Successful withdraw of {} to {}", amount, dest))
}
}
Err(e) => HttpResponse::InternalServerError()
.json(format!("Withdraw failed with {:?} try again!", e)),
}
}

Expand Down
Loading

0 comments on commit 389d0a8

Please sign in to comment.